Fix behavior of Stream.empty.

The underlying subscription would invoke the `onDone` handler even
after a `cancel`.
It also forgot to register the `onDone` callback in the zone before
running it.

Tweaked the behavior of `pause` and `resume` to make sure they make
no difference after `cancel` or after a done event has been omitted.
(Test now checks that the behavior matches other streams.)

Fixes #53201

Bug: https://dartbug.com/53201
Change-Id: Iba35be2c4b44b5c4ec97d5a4dbcd3aff7fee8b75
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/320561
Reviewed-by: Aske Simon Christensen <askesc@google.com>
Reviewed-by: Nate Bosch <nbosch@google.com>
Reviewed-by: Martin Kustermann <kustermann@google.com>
Commit-Queue: Lasse Nielsen <lrn@google.com>
Reviewed-by: Stephen Adams <sra@google.com>
This commit is contained in:
Lasse R.H. Nielsen 2023-08-17 15:26:59 +00:00 committed by Commit Queue
parent f779ba3cb0
commit 4a38cb09f5
6 changed files with 414 additions and 150 deletions

View file

@ -18,6 +18,10 @@
### Libraries
#### `dart:async`
- Added `broadcast` parameter to `Stream.empty` constructor.
#### `dart:developer`
- Deprecated the `Service.getIsolateID` method.

View file

@ -4,8 +4,8 @@
/// This library is used for testing asynchronous tests.
/// If a test is asynchronous, it needs to notify the testing driver
/// about this (otherwise tests may get reported as passing [after main()
/// finished] even if the asynchronous operations fail).
/// about this (otherwise tests may get reported as passing after main()
/// finished even if the asynchronous operations fail).
///
/// This library provides four methods
/// - asyncStart(): Needs to be called before an asynchronous operation is
@ -16,10 +16,9 @@
/// - asyncTest(f()): Helper method that wraps a computation that returns a
/// Future with matching calls to asyncStart() and
/// asyncSuccess(_).
/// After the last asyncStart() called was matched with a corresponding
/// After every asyncStart() called is matched with a corresponding
/// asyncEnd() or asyncSuccess(_) call, the testing driver will be notified that
/// the tests is done.
library async_helper;
import 'dart:async';
@ -68,21 +67,25 @@ void asyncEnd() {
/// Call this after an asynchronous test has ended successfully. This is a helper
/// for calling [asyncEnd].
///
/// This method intentionally has a signature that matches [:Future.then:] as a
/// convenience for calling [asyncEnd] when a [:Future:] completes without error,
/// This method intentionally has a signature that matches `Future.then` as a
/// convenience for calling [asyncEnd] when a `Future` completes without error,
/// like this:
///
/// asyncStart();
/// Future result = test();
/// result.then(asyncSuccess);
void asyncSuccess(_) => asyncEnd();
/// ```dart
/// asyncStart();
/// Future result = test();
/// result.then(asyncSuccess);
/// ```
void asyncSuccess(void _) {
asyncEnd();
}
/// Helper method for performing asynchronous tests involving [:Future:].
/// Helper method for performing asynchronous tests involving `Future`.
///
/// [f] must return a [:Future:] for the test computation.
Future<void> asyncTest(Function() f) {
/// The function [test] must return a `Future` which completes without error
/// when the test is successful.
Future<void> asyncTest(Function() test) {
asyncStart();
return f().then(asyncSuccess);
return test().then(asyncSuccess);
}
/// Verifies that the asynchronous [result] throws a [T].
@ -109,7 +112,7 @@ Future<T> asyncExpectThrows<T extends Object>(Future<void> result,
// Handle null being passed in from legacy code
// while also avoiding producing an unnecessary null check warning here.
if ((reason as dynamic) == null) reason = "";
// Only include the type in he message if it's not a top-type.
// Only include the type in the message if it's not `Object`.
var type = Object() is! T ? "<$T>" : "";
return "asyncExpectThrows$type($reason):";
}

View file

@ -154,7 +154,15 @@ abstract mixin class Stream<T> {
/// },
/// );
/// ```
const factory Stream.empty() = _EmptyStream<T>;
///
/// The stream defaults to being a broadcast stream,
/// as reported by [isBroadcast].
/// This value can be changed by passing `false` as
/// the [broadcast] parameter, which defaults to `true`.
///
/// The stream can be listened to more than once,
/// whether it reports itself as broadcast or not.
const factory Stream.empty({@Since("3.2") bool broadcast}) = _EmptyStream<T>;
/// Creates a stream which emits a single data event before closing.
///

View file

@ -630,49 +630,97 @@ typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription);
/// Done subscription that will send one done event as soon as possible.
class _DoneStreamSubscription<T> implements StreamSubscription<T> {
static const int _DONE_SENT = 1;
static const int _SCHEDULED = 2;
static const int _PAUSED = 4;
// States of the subscription.
//
// The subscription will try to send a done event.
// When created, it schedules a microtask to emit the
// event to the current [_onDone] callback.
// If paused when the microtask happens, the subscription won't
// send the done event. It will reschedule a new microtask when
// the subscription is resumed.
// If cancelled, the event will not be sent, and a new
// microtask won't be scheduled, and further pauses
// are ignored.
/// Sending a done event is allowed.
///
/// Not paused or cancelled, no microtask scheduled.
static const int _stateReadyToSend = 0;
/// Set when a microtask is scheduled, and not cancelled.
static const int _stateScheduled = 1;
/// State set when done event sent or subscription cancelled.
///
/// Any negative value counts as done, so we can safely subtract
/// [_stateScheduled] or [_statePausedOnce] without affecting
/// being cancelled.
///
/// Never considered paused in this state, may still be scheduled,
/// and [_onDone] is always set to `null`.
static const int _stateDone = -1;
/// Added for each pause while not done or cancelled, subtracted on resume.
///
/// Subscription is paused when state is at least `_statePausedOnce`.
static const int _statePausedOnce = 2;
int _state;
final Zone _zone;
int _state = 0;
void Function()? _onDone;
_DoneStreamSubscription(this._onDone) : _zone = Zone.current {
_schedule();
}
bool get _isSent => (_state & _DONE_SENT) != 0;
bool get _isScheduled => (_state & _SCHEDULED) != 0;
bool get isPaused => _state >= _PAUSED;
void _schedule() {
if (_isScheduled) return;
_zone.scheduleMicrotask(_sendDone);
_state |= _SCHEDULED;
}
void onData(void handleData(T data)?) {}
void onError(Function? handleError) {}
void onDone(void handleDone()?) {
_onDone = handleDone;
}
void pause([Future<void>? resumeSignal]) {
_state += _PAUSED;
if (resumeSignal != null) resumeSignal.whenComplete(resume);
}
void resume() {
if (isPaused) {
_state -= _PAUSED;
if (!isPaused && !_isSent) {
_schedule();
}
_DoneStreamSubscription(void Function()? onDone)
: _zone = Zone.current,
_state = _stateScheduled {
scheduleMicrotask(_onMicrotask);
if (onDone != null) {
_onDone = _zone.registerCallback(onDone);
}
}
Future cancel() => Future._nullFuture;
bool get isPaused => _state >= _statePausedOnce;
/// True after being cancelled, or after delivering done event.
static bool _isDone(int state) => state < 0;
static int _incrementPauseCount(int state) => state + _statePausedOnce;
static int _decrementPauseCount(int state) => state - _statePausedOnce;
void onData(void handleData(T data)?) {}
void onError(Function? handleError) {}
void onDone(void handleDone()?) {
if (!_isDone(_state)) {
if (handleDone != null) handleDone = _zone.registerCallback(handleDone);
_onDone = handleDone;
}
}
void pause([Future<void>? resumeSignal]) {
if (!_isDone(_state)) {
_state = _incrementPauseCount(_state);
if (resumeSignal != null) resumeSignal.whenComplete(resume);
}
}
void resume() {
var resumeState = _decrementPauseCount(_state);
if (resumeState < 0) return; // Wasn't paused.
if (resumeState == _stateReadyToSend) {
// No longer paused, and not already scheduled.
_state = _stateScheduled;
scheduleMicrotask(_onMicrotask);
} else {
_state = resumeState;
}
}
Future cancel() {
_state = _stateDone;
_onDone = null;
return Future._nullFuture;
}
Future<E> asFuture<E>([E? futureValue]) {
E resultValue;
@ -684,19 +732,28 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {
} else {
resultValue = futureValue;
}
_Future<E> result = new _Future<E>();
_onDone = () {
result._completeWithValue(resultValue);
};
_Future<E> result = _Future<E>();
if (!_isDone(_state)) {
_onDone = _zone.registerCallback(() {
result._completeWithValue(resultValue);
});
}
return result;
}
void _sendDone() {
_state &= ~_SCHEDULED;
if (isPaused) return;
_state |= _DONE_SENT;
var doneHandler = _onDone;
if (doneHandler != null) _zone.runGuarded(doneHandler);
void _onMicrotask() {
var unscheduledState = _state - _stateScheduled;
if (unscheduledState == _stateReadyToSend) {
// Send the done event.
_state = _stateDone;
if (_onDone case var doneHandler?) {
_onDone = null;
_zone.runGuarded(doneHandler);
}
} else {
// Paused or cancelled.
_state = unscheduledState;
}
}
}
@ -1007,11 +1064,11 @@ class _StreamIterator<T> implements StreamIterator<T> {
/// An empty broadcast stream, sending a done event as soon as possible.
class _EmptyStream<T> extends Stream<T> {
const _EmptyStream();
bool get isBroadcast => true;
StreamSubscription<T> listen(void onData(T data)?,
{Function? onError, void onDone()?, bool? cancelOnError}) {
return new _DoneStreamSubscription<T>(onDone);
const _EmptyStream({bool broadcast = true}) : isBroadcast = broadcast;
final bool isBroadcast;
StreamSubscription<T> listen(void Function(T data)? onData,
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
return _DoneStreamSubscription<T>(onDone);
}
}

View file

@ -7,62 +7,158 @@ import "package:expect/expect.dart";
import "dart:async";
import 'package:async_helper/async_helper.dart';
main() {
asyncStart();
runTest().whenComplete(asyncEnd);
main() async {
await asyncTest(() async {
// Is a `const` constructor.
// Can be called with optional boolean to say whether broadcast or not.
await asyncTest(() => emptyTest(const Stream<int>.empty(), true));
await asyncTest(
() => emptyTest(const Stream<int>.empty(broadcast: true), true));
await asyncTest(
() => emptyTest(const Stream<int>.empty(broadcast: false), false));
// Check that the behavior is consistent with other empty multi-subscription
// streams.
await asyncTest(() => emptyTest(Stream<int>.fromIterable(<int>[]), false));
await asyncTest(() =>
emptyTest((StreamController<int>.broadcast()..close()).stream, true));
await asyncTest(
() => emptyTest(Stream<int>.multi((c) => c.close()), false));
});
await flushMicrotasks();
}
Future runTest() async {
unreachable([a, b]) {
throw "UNREACHABLE";
// Function which fails the test if it gets called.
void unreachable([a, b]) {
Expect.fail("Unreachable");
}
/// Check that something happens once.
class Checker {
bool checked = false;
void check() {
if (checked) Expect.fail("Checked more than once");
checked = true;
}
}
int tick = 0;
ticker() {
tick++;
}
Future<void> emptyTest(Stream<int> s, bool broadcast) async {
var checker = Checker();
// Respects type parameter (not a `Stream<Never>`)
Expect.notType<Stream<String>>(s);
asyncStart();
// Has the expected `isBroadcast`.
Expect.equals(broadcast, s.isBroadcast);
Stream<int> s = const Stream<int>.empty(); // Is const constructor.
Expect.isFalse(s is Stream<String>); // Respects type parameter.
StreamSubscription<int> sub =
s.listen(unreachable, onError: unreachable, onDone: ticker);
Expect.isFalse(sub is StreamSubscription<String>); // Type parameter in sub.
s.listen(unreachable, onError: unreachable, onDone: checker.check);
// Type parameter of subscription repspects stream.
// Not a `StreamSubscription<Never>`.
Expect.isFalse(sub is StreamSubscription<String>);
// Doesn't do callback in response to listen.
Expect.equals(tick, 0);
Expect.isFalse(sub.isPaused);
// Doesn't do callback immediately in response to listen.
Expect.isFalse(checker.checked);
await flushMicrotasks();
// Completes eventually.
Expect.equals(tick, 1);
// Completes eventually, after a microtask.
Expect.isTrue(checker.checked);
Expect.isFalse(sub.isPaused);
// It's a broadcast stream - can listen twice.
Expect.isTrue(s.isBroadcast);
// Can listen more than once, whether broadcast stream or not.
checker = Checker();
StreamSubscription<int> sub2 =
s.listen(unreachable, onError: unreachable, onDone: unreachable);
// respects pause.
s.listen(unreachable, onError: unreachable, onDone: checker.check);
// Respects pauses.
sub2.pause();
await flushMicrotasks();
// respects cancel.
sub2.cancel();
await flushMicrotasks();
Expect.equals(tick, 1);
// Still not complete.
Expect.isTrue(sub2.isPaused);
sub2.pause();
Expect.isTrue(sub2.isPaused);
sub2.pause(Future<int>.value(0));
Expect.isTrue(sub2.isPaused);
StreamSubscription<int> sub3 =
s.listen(unreachable, onError: unreachable, onDone: ticker);
// respects pause.
await flushMicrotasks();
Expect.isTrue(sub2.isPaused);
Expect.isFalse(checker.checked);
// Resumes when all pauses resumed.
sub2.resume();
Expect.isTrue(sub2.isPaused);
await flushMicrotasks();
Expect.isFalse(checker.checked);
Expect.isTrue(sub2.isPaused);
sub2.resume();
Expect.isFalse(sub2.isPaused);
await flushMicrotasks();
Expect.isTrue(checker.checked);
Expect.isFalse(sub2.isPaused);
// Can't pause after done.
sub2.pause(Future<int>.value(0));
Expect.isFalse(sub2.isPaused);
sub2.pause();
Expect.isFalse(sub2.isPaused);
// Respects cancel.
var sub3 = s.listen(unreachable, onError: unreachable, onDone: unreachable);
sub3.cancel();
sub3.onDone(unreachable);
Expect.isFalse(sub3.isPaused);
await flushMicrotasks();
Expect.isFalse(sub3.isPaused);
// Can't pause after cancel
sub3.pause();
Expect.equals(tick, 1);
await flushMicrotasks();
// Doesn't complete while paused.
Expect.equals(tick, 1);
sub3.resume();
await flushMicrotasks();
// Now completed.
Expect.equals(tick, 2);
Expect.isFalse(sub3.isPaused);
sub3.pause(Future<int>.value(0));
Expect.isFalse(sub3.isPaused);
// No errors.
asyncEnd();
// Respects cancel while paused.
var sub4 = s.listen(unreachable, onError: unreachable, onDone: unreachable);
sub4.pause();
sub4.cancel();
sub4.onDone(unreachable);
await flushMicrotasks();
sub4.resume();
await flushMicrotasks();
// Check that the stream is zone-aware.
// Registers onDone callback.
var log = [];
late final Zone zone;
void callback1() {
// Run in correct zone.
Expect.equals(zone, Zone.current);
log.add("don1");
}
void callback2() {
// Run in correct zone.
Expect.equals(zone, Zone.current);
log.add("don2");
}
zone = Zone.current.fork(
specification: ZoneSpecification(registerCallback: <R>(s, p, z, f) {
if (f == callback1) log.add("reg1");
if (f == callback2) log.add("reg2");
return p.registerCallback<R>(z, f);
}, run: <R>(s, p, z, f) {
if (f == callback1) log.add("run1");
if (f == callback2) log.add("run2");
return p.run<R>(z, f);
}));
await zone.run(() async {
var s = Stream<int>.empty();
var sub = s.listen(unreachable, onError: unreachable, onDone: callback1);
sub.onDone(callback2);
});
await flushMicrotasks();
Expect.listEquals(["reg1", "reg2", "run2", "don2"], log);
}
Future flushMicrotasks() => new Future.delayed(Duration.zero);

View file

@ -9,62 +9,158 @@ import "package:expect/expect.dart";
import "dart:async";
import 'package:async_helper/async_helper.dart';
main() {
asyncStart();
runTest().whenComplete(asyncEnd);
main() async {
await asyncTest(() async {
// Is a `const` constructor.
// Can be called with optional boolean to say whether broadcast or not.
await asyncTest(() => emptyTest(const Stream<int>.empty(), true));
await asyncTest(
() => emptyTest(const Stream<int>.empty(broadcast: true), true));
await asyncTest(
() => emptyTest(const Stream<int>.empty(broadcast: false), false));
// Check that the behavior is consistent with other empty multi-subscription
// streams.
await asyncTest(() => emptyTest(Stream<int>.fromIterable(<int>[]), false));
await asyncTest(() =>
emptyTest((StreamController<int>.broadcast()..close()).stream, true));
await asyncTest(
() => emptyTest(Stream<int>.multi((c) => c.close()), false));
});
await flushMicrotasks();
}
Future runTest() async {
unreachable([a, b]) {
throw "UNREACHABLE";
// Function which fails the test if it gets called.
void unreachable([a, b]) {
Expect.fail("Unreachable");
}
/// Check that something happens once.
class Checker {
bool checked = false;
void check() {
if (checked) Expect.fail("Checked more than once");
checked = true;
}
}
int tick = 0;
ticker() {
tick++;
}
Future<void> emptyTest(Stream<int> s, bool broadcast) async {
var checker = Checker();
// Respects type parameter (not a `Stream<Never>`)
Expect.notType<Stream<String>>(s);
asyncStart();
// Has the expected `isBroadcast`.
Expect.equals(broadcast, s.isBroadcast);
Stream<int> s = const Stream<int>.empty(); // Is const constructor.
Expect.isFalse(s is Stream<String>); // Respects type parameter.
StreamSubscription<int> sub =
s.listen(unreachable, onError: unreachable, onDone: ticker);
Expect.isFalse(sub is StreamSubscription<String>); // Type parameter in sub.
s.listen(unreachable, onError: unreachable, onDone: checker.check);
// Type parameter of subscription repspects stream.
// Not a `StreamSubscription<Never>`.
Expect.isFalse(sub is StreamSubscription<String>);
// Doesn't do callback in response to listen.
Expect.equals(tick, 0);
Expect.isFalse(sub.isPaused);
// Doesn't do callback immediately in response to listen.
Expect.isFalse(checker.checked);
await flushMicrotasks();
// Completes eventually.
Expect.equals(tick, 1);
// Completes eventually, after a microtask.
Expect.isTrue(checker.checked);
Expect.isFalse(sub.isPaused);
// It's a broadcast stream - can listen twice.
Expect.isTrue(s.isBroadcast);
// Can listen more than once, whether broadcast stream or not.
checker = Checker();
StreamSubscription<int> sub2 =
s.listen(unreachable, onError: unreachable, onDone: unreachable);
// respects pause.
s.listen(unreachable, onError: unreachable, onDone: checker.check);
// Respects pauses.
sub2.pause();
await flushMicrotasks();
// respects cancel.
sub2.cancel();
await flushMicrotasks();
Expect.equals(tick, 1);
// Still not complete.
Expect.isTrue(sub2.isPaused);
sub2.pause();
Expect.isTrue(sub2.isPaused);
sub2.pause(Future<int>.value(0));
Expect.isTrue(sub2.isPaused);
StreamSubscription<int> sub3 =
s.listen(unreachable, onError: unreachable, onDone: ticker);
// respects pause.
await flushMicrotasks();
Expect.isTrue(sub2.isPaused);
Expect.isFalse(checker.checked);
// Resumes when all pauses resumed.
sub2.resume();
Expect.isTrue(sub2.isPaused);
await flushMicrotasks();
Expect.isFalse(checker.checked);
Expect.isTrue(sub2.isPaused);
sub2.resume();
Expect.isFalse(sub2.isPaused);
await flushMicrotasks();
Expect.isTrue(checker.checked);
Expect.isFalse(sub2.isPaused);
// Can't pause after done.
sub2.pause(Future<int>.value(0));
Expect.isFalse(sub2.isPaused);
sub2.pause();
Expect.isFalse(sub2.isPaused);
// Respects cancel.
var sub3 = s.listen(unreachable, onError: unreachable, onDone: unreachable);
sub3.cancel();
sub3.onDone(unreachable);
Expect.isFalse(sub3.isPaused);
await flushMicrotasks();
Expect.isFalse(sub3.isPaused);
// Can't pause after cancel
sub3.pause();
Expect.equals(tick, 1);
await flushMicrotasks();
// Doesn't complete while paused.
Expect.equals(tick, 1);
sub3.resume();
await flushMicrotasks();
// Now completed.
Expect.equals(tick, 2);
Expect.isFalse(sub3.isPaused);
sub3.pause(Future<int>.value(0));
Expect.isFalse(sub3.isPaused);
// No errors.
asyncEnd();
// Respects cancel while paused.
var sub4 = s.listen(unreachable, onError: unreachable, onDone: unreachable);
sub4.pause();
sub4.cancel();
sub4.onDone(unreachable);
await flushMicrotasks();
sub4.resume();
await flushMicrotasks();
// Check that the stream is zone-aware.
// Registers onDone callback.
var log = [];
Zone zone;
void callback1() {
// Run in correct zone.
Expect.equals(zone, Zone.current);
log.add("don1");
}
void callback2() {
// Run in correct zone.
Expect.equals(zone, Zone.current);
log.add("don2");
}
zone = Zone.current.fork(
specification: ZoneSpecification(registerCallback: <R>(s, p, z, f) {
if (f == callback1) log.add("reg1");
if (f == callback2) log.add("reg2");
return p.registerCallback<R>(z, f);
}, run: <R>(s, p, z, f) {
if (f == callback1) log.add("run1");
if (f == callback2) log.add("run2");
return p.run<R>(z, f);
}));
await zone.run(() async {
var s = Stream<int>.empty();
var sub = s.listen(unreachable, onError: unreachable, onDone: callback1);
sub.onDone(callback2);
});
await flushMicrotasks();
Expect.listEquals(["reg1", "reg2", "run2", "don2"], log);
}
Future flushMicrotasks() => new Future.delayed(Duration.zero);