mirror of
https://github.com/dart-lang/sdk
synced 2024-10-14 11:03:19 +00:00
Fix bug in Stream.multi.
The sync operations on a MultiStreamController did not check whether sending events at the current time was allowed. That could lead to `null` dereferencing errors when doing operations on the controller after a cancel, and could cause events to appear out of order. Change-Id: I06b86a78959dfcaa402f74e2980a9d515f097dc9 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/159442 Reviewed-by: Erik Ernst <eernst@google.com> Commit-Queue: Lasse R.H. Nielsen <lrn@google.com>
This commit is contained in:
parent
70914a7164
commit
0d322488ae
|
@ -2344,6 +2344,8 @@ abstract class MultiStreamController<T> implements StreamController<T> {
|
||||||
* Delivery can be delayed if other previously added events are
|
* Delivery can be delayed if other previously added events are
|
||||||
* still pending delivery, if the subscription is paused,
|
* still pending delivery, if the subscription is paused,
|
||||||
* or if the subscription isn't listening yet.
|
* or if the subscription isn't listening yet.
|
||||||
|
* If it's necessary to know whether the "done" event has been delievered,
|
||||||
|
* [done] future will complete when that has happend.
|
||||||
*/
|
*/
|
||||||
void closeSync();
|
void closeSync();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1122,15 +1122,22 @@ class _MultiStreamController<T> extends _AsyncStreamController<T>
|
||||||
_MultiStreamController() : super(null, null, null, null);
|
_MultiStreamController() : super(null, null, null, null);
|
||||||
|
|
||||||
void addSync(T data) {
|
void addSync(T data) {
|
||||||
_subscription._add(data);
|
if (!_mayAddEvent) throw _badEventState();
|
||||||
|
if (hasListener) _subscription._add(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addErrorSync(Object error, [StackTrace? stackTrace]) {
|
void addErrorSync(Object error, [StackTrace? stackTrace]) {
|
||||||
_subscription._addError(error, stackTrace ?? StackTrace.empty);
|
if (!_mayAddEvent) throw _badEventState();
|
||||||
|
if (hasListener) {
|
||||||
|
_subscription._addError(error, stackTrace ?? StackTrace.empty);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void closeSync() {
|
void closeSync() {
|
||||||
_subscription._close();
|
if (isClosed) return;
|
||||||
|
if (!_mayAddEvent) throw _badEventState();
|
||||||
|
_state |= _StreamController._STATE_CLOSED;
|
||||||
|
if (hasListener) _subscription._close();
|
||||||
}
|
}
|
||||||
|
|
||||||
Stream<T> get stream {
|
Stream<T> get stream {
|
||||||
|
|
|
@ -44,6 +44,7 @@ void main() {
|
||||||
testStreamsIndependent();
|
testStreamsIndependent();
|
||||||
asyncTest(testStreamNonOverlap);
|
asyncTest(testStreamNonOverlap);
|
||||||
asyncTest(testRepeatLatest);
|
asyncTest(testRepeatLatest);
|
||||||
|
asyncTest(testIncorrectUse);
|
||||||
asyncEnd();
|
asyncEnd();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -138,3 +139,79 @@ Future<void> testRepeatLatest() async {
|
||||||
var l3 = await f3;
|
var l3 = await f3;
|
||||||
Expect.listEquals([2, 3], l3);
|
Expect.listEquals([2, 3], l3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test that errors are thrown when required,
|
||||||
|
// and use after cancel is ignored.
|
||||||
|
Future<void> testIncorrectUse() async {
|
||||||
|
{
|
||||||
|
var lock = Completer();
|
||||||
|
var lock2 = Completer();
|
||||||
|
var stream = Stream.multi((c) async {
|
||||||
|
c.add(2);
|
||||||
|
await lock.future;
|
||||||
|
Expect.isTrue(!c.hasListener);
|
||||||
|
c.add(2);
|
||||||
|
c.addError("Error");
|
||||||
|
c.close();
|
||||||
|
// No adding after close.
|
||||||
|
Expect.throws<StateError>(() => c.add(3));
|
||||||
|
Expect.throws<StateError>(() => c.addSync(3));
|
||||||
|
Expect.throws<StateError>(() => c.addError("E"));
|
||||||
|
Expect.throws<StateError>(() => c.addErrorSync("E"));
|
||||||
|
Expect.throws<StateError>(() => c.addStream(Stream.empty()));
|
||||||
|
lock2.complete();
|
||||||
|
});
|
||||||
|
await for (var v in stream) {
|
||||||
|
Expect.equals(2, v);
|
||||||
|
break; // Cancels subscription.
|
||||||
|
}
|
||||||
|
lock.complete();
|
||||||
|
await lock2.future;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
var lock = Completer();
|
||||||
|
var lock2 = Completer();
|
||||||
|
var stream = Stream.multi((c) async {
|
||||||
|
c.add(2);
|
||||||
|
await lock.future;
|
||||||
|
Expect.isTrue(!c.hasListener);
|
||||||
|
c.addSync(2);
|
||||||
|
c.addErrorSync("Error");
|
||||||
|
c.closeSync();
|
||||||
|
// No adding after close.
|
||||||
|
Expect.throws<StateError>(() => c.add(3));
|
||||||
|
Expect.throws<StateError>(() => c.addSync(3));
|
||||||
|
Expect.throws<StateError>(() => c.addError("E"));
|
||||||
|
Expect.throws<StateError>(() => c.addErrorSync("E"));
|
||||||
|
Expect.throws<StateError>(() => c.addStream(Stream.empty()));
|
||||||
|
lock2.complete();
|
||||||
|
});
|
||||||
|
await for (var v in stream) {
|
||||||
|
Expect.equals(2, v);
|
||||||
|
break; // Cancels subscription.
|
||||||
|
}
|
||||||
|
lock.complete();
|
||||||
|
await lock2.future;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
var stream = Stream.multi((c) async {
|
||||||
|
var c2 = StreamController();
|
||||||
|
c.addStream(c2.stream);
|
||||||
|
// Now adding stream, cannot add events at the same time (for now!).
|
||||||
|
Expect.throws<StateError>(() => c.add(1));
|
||||||
|
Expect.throws<StateError>(() => c.addSync(1));
|
||||||
|
Expect.throws<StateError>(() => c.addError("Error"));
|
||||||
|
Expect.throws<StateError>(() => c.addErrorSync("Error"));
|
||||||
|
Expect.throws<StateError>(() => c.close());
|
||||||
|
Expect.throws<StateError>(() => c.closeSync());
|
||||||
|
await c2.close();
|
||||||
|
c.add(42);
|
||||||
|
c.close();
|
||||||
|
});
|
||||||
|
await for (var v in stream) {
|
||||||
|
Expect.equals(42, v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue