diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart index e0a2ce28756..b0ee0ae9807 100644 --- a/sdk/lib/async/stream.dart +++ b/sdk/lib/async/stream.dart @@ -2344,6 +2344,8 @@ abstract class MultiStreamController implements StreamController { * Delivery can be delayed if other previously added events are * still pending delivery, if the subscription is paused, * or if the subscription isn't listening yet. + * If it's necessary to know whether the "done" event has been delievered, + * [done] future will complete when that has happend. */ void closeSync(); } diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart index 942f4a7455d..52e5ca3d9e8 100644 --- a/sdk/lib/async/stream_impl.dart +++ b/sdk/lib/async/stream_impl.dart @@ -1122,15 +1122,22 @@ class _MultiStreamController extends _AsyncStreamController _MultiStreamController() : super(null, null, null, null); void addSync(T data) { - _subscription._add(data); + if (!_mayAddEvent) throw _badEventState(); + if (hasListener) _subscription._add(data); } void addErrorSync(Object error, [StackTrace? stackTrace]) { - _subscription._addError(error, stackTrace ?? StackTrace.empty); + if (!_mayAddEvent) throw _badEventState(); + if (hasListener) { + _subscription._addError(error, stackTrace ?? StackTrace.empty); + } } void closeSync() { - _subscription._close(); + if (isClosed) return; + if (!_mayAddEvent) throw _badEventState(); + _state |= _StreamController._STATE_CLOSED; + if (hasListener) _subscription._close(); } Stream get stream { diff --git a/tests/lib/async/stream_multi_test.dart b/tests/lib/async/stream_multi_test.dart index e57c4f4ddc7..b2beb44ede1 100644 --- a/tests/lib/async/stream_multi_test.dart +++ b/tests/lib/async/stream_multi_test.dart @@ -44,6 +44,7 @@ void main() { testStreamsIndependent(); asyncTest(testStreamNonOverlap); asyncTest(testRepeatLatest); + asyncTest(testIncorrectUse); asyncEnd(); } @@ -138,3 +139,79 @@ Future testRepeatLatest() async { var l3 = await f3; Expect.listEquals([2, 3], l3); } + +// Test that errors are thrown when required, +// and use after cancel is ignored. +Future testIncorrectUse() async { + { + var lock = Completer(); + var lock2 = Completer(); + var stream = Stream.multi((c) async { + c.add(2); + await lock.future; + Expect.isTrue(!c.hasListener); + c.add(2); + c.addError("Error"); + c.close(); + // No adding after close. + Expect.throws(() => c.add(3)); + Expect.throws(() => c.addSync(3)); + Expect.throws(() => c.addError("E")); + Expect.throws(() => c.addErrorSync("E")); + Expect.throws(() => c.addStream(Stream.empty())); + lock2.complete(); + }); + await for (var v in stream) { + Expect.equals(2, v); + break; // Cancels subscription. + } + lock.complete(); + await lock2.future; + } + + { + var lock = Completer(); + var lock2 = Completer(); + var stream = Stream.multi((c) async { + c.add(2); + await lock.future; + Expect.isTrue(!c.hasListener); + c.addSync(2); + c.addErrorSync("Error"); + c.closeSync(); + // No adding after close. + Expect.throws(() => c.add(3)); + Expect.throws(() => c.addSync(3)); + Expect.throws(() => c.addError("E")); + Expect.throws(() => c.addErrorSync("E")); + Expect.throws(() => c.addStream(Stream.empty())); + lock2.complete(); + }); + await for (var v in stream) { + Expect.equals(2, v); + break; // Cancels subscription. + } + lock.complete(); + await lock2.future; + } + + { + var stream = Stream.multi((c) async { + var c2 = StreamController(); + c.addStream(c2.stream); + // Now adding stream, cannot add events at the same time (for now!). + Expect.throws(() => c.add(1)); + Expect.throws(() => c.addSync(1)); + Expect.throws(() => c.addError("Error")); + Expect.throws(() => c.addErrorSync("Error")); + Expect.throws(() => c.close()); + Expect.throws(() => c.closeSync()); + await c2.close(); + c.add(42); + c.close(); + }); + await for (var v in stream) { + Expect.equals(42, v); + } + } +}