From 17d0ba55e85f16794a65d67cb4544665e3467053 Mon Sep 17 00:00:00 2001 From: Lasse Reichstein Holst Nielsen Date: Mon, 29 Jun 2020 14:04:53 +0000 Subject: [PATCH] Add `Stream.multi` constructor. A generalized stream which provides a controller for each listener. Can be used to implement both broadcast streams and single subscription streams, as well as any stream behavior between the two. Change-Id: I7a75f8736ca6bc91ce266e768db68536efd24dfe Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/150936 Commit-Queue: Lasse R.H. Nielsen Reviewed-by: Nate Bosch --- CHANGELOG.md | 41 +++---- sdk/lib/async/async.dart | 1 + sdk/lib/async/stream.dart | 117 +++++++++++++++++++ sdk/lib/async/stream_controller.dart | 5 +- sdk/lib/async/stream_impl.dart | 40 +++++++ tests/lib/async/stream_multi_test.dart | 140 +++++++++++++++++++++++ tests/lib_2/async/stream_multi_test.dart | 139 ++++++++++++++++++++++ 7 files changed, 459 insertions(+), 24 deletions(-) create mode 100644 tests/lib/async/stream_multi_test.dart create mode 100644 tests/lib_2/async/stream_multi_test.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index b58c4a0b64b..9308a95d681 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,12 +4,11 @@ ### Core libraries -#### `dart:io` +#### `dart:async` -* [#42006][]: The signature of `exit` has been changed to return the - `Never`type instead of `void`. since no code will run after it, - -[#42006]: https://github.com/dart-lang/sdk/issues/42006 +* Adds `Stream.multi` constructor creating streams which can be + listened to more than once, and where each individual listener + can be controlled independently. #### `dart:convert` @@ -23,6 +22,20 @@ [#41100]: https://github.com/dart-lang/sdk/issues/41100 [WHATWG encoding standard]: https://encoding.spec.whatwg.org/#utf-8-decoder +#### `dart:io` + +* [#42006][]: The signature of `exit` has been changed to return the + `Never`type instead of `void`. since no code will run after it, +* Class `OSError` now implements `Exception`. This change means `OSError` will + now be caught in catch clauses catching `Exception`s. +* Added `InternetAddress.tryParse`. +* [Abstract Unix Domain Socket][] is supported on Linux/Android now. Using an + `InternetAddress` with `address` starting with '@' and type being + `InternetAddressType.Unix` will create an abstract Unix Domain Socket. + +[#42006]: https://github.com/dart-lang/sdk/issues/42006 +[Abstract Unix Domain Socket]: http://man7.org/linux/man-pages/man7/unix.7.html + #### `dart:html` * **Breaking Change**: `CssClassSet.add()` previously returned `null` if the @@ -37,21 +50,9 @@ `Future` is empty instead, it completes asynchronously, therefore potentially invalidating code that relied on the synchronous side-effect. This change will only affect code using sound null-safety. See issue - [41653][] for more details. + [#41653][] for more details. -[41653]: https://github.com/dart-lang/sdk/issues/41653 - - -#### `dart:io` - -* Class `OSError` now implements `Exception`. This change means `OSError` will - now be caught in catch clauses catching `Exception`s. -* Added `InternetAddress.tryParse`. -* [Abstract Unix Domain Socket][] is supported on Linux/Android now. Using an - `InternetAddress` with `address` starting with '@' and type being - `InternetAddressType.Unix` will create an abstract Unix Domain Socket. - -[Abstract Unix Domain Socket]: http://man7.org/linux/man-pages/man7/unix.7.html +[#41653]: https://github.com/dart-lang/sdk/issues/41653 ### Tools @@ -100,7 +101,7 @@ Updated the Linter to `0.1.117`, which includes: * Warn at publishing first time a package version opts in to null-safety. * Preserve Windows line endings in pubspec.lock if they are already there (#2489) * Better terminal color-detection. Use colors in terminals on Windows. -* `pub outdated`: If the current version of a dependency is a prerelease +* `pub outdated`: If the current version of a dependency is a prerelease version, use prereleases for latest if no newer stable. * `pub outdated` now works without a lockfile. In that case the 'Current' column will be empty. diff --git a/sdk/lib/async/async.dart b/sdk/lib/async/async.dart index 8cce95923bf..5e72a00f436 100644 --- a/sdk/lib/async/async.dart +++ b/sdk/lib/async/async.dart @@ -98,6 +98,7 @@ import "dart:_internal" show CastStream, CastStreamTransformer, + checkNotNullable, EmptyIterator, IterableElementError, printToZone, diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart index 46ec1617af3..e0a2ce28756 100644 --- a/sdk/lib/async/stream.dart +++ b/sdk/lib/async/stream.dart @@ -247,6 +247,77 @@ abstract class Stream { () => new _IterablePendingEvents(elements)); } + /** + * Creates a multi-subscription stream. + * + * Each time the created stream is listened to, + * the [onListen] callback is invoked with a new [MultiStreamController] + * which forwards events to the [StreamSubscription] + * returned by that [listen] call. + * + * This allows each listener to be treated as an individual stream. + * + * The [MultiStreamController] does not support reading its + * [StreamController.stream]. Setting its [StreamController.onListen] + * has no effect since the [onListen] callback is called instead, + * and the [StreamController.onListen] won't be called later. + * The controller acts like an asynchronous controller, + * but provides extra methods for delivering events synchronously. + * + * If [isBroadcast] is set to `true`, the returned stream's + * [Stream.isBroadcast] will be `true`. + * This has no effect on the stream behavior, + * it is up to the [onListen] function + * to act like a broadcast stream if it claims to be one. + * + * A multi-subscription stream can behave like any other stream. + * If the [onListen] callback throws on every call after the first, + * the stream behaves like a single-subscription stream. + * If the stream emits the same events to all current listeners, + * it behaves like a broadcast stream. + * + * It can also choose to emit different events to different listeners. + * For example, a stream which repeats the most recent + * non-`null` event to new listeners, could be implemented as this example: + * ```dart + * extension StreamRepeatLatestExtension on Stream { + * Stream repeatLatest() { + * var done = false; + * T? latest = null; + * var currentListeners = >{}; + * this.listen((event) { + * latest = event; + * for (var listener in [...currentListeners]) listener.addSync(event); + * }, onError: (Object error, StackTrace stack) { + * for (var listener in [...currentListeners]) listener.addErrorSync(error, stack); + * }, onDone: () { + * done = true; + * latest = null; + * for (var listener in currentListeners) listener.closeSync(); + * currentListeners.clear(); + * }); + * return Stream.multi((controller) { + * if (done) { + * controller.close(); + * return; + * } + * currentListeners.add(controller); + * var latestValue = latest; + * if (latestValue != null) controller.add(latestValue); + * controller.onCancel = () { + * currentListeners.remove(controller); + * }; + * }); + * } + * } + * ``` + */ + @Since("2.9") + factory Stream.multi(void Function(MultiStreamController) onListen, + {bool isBroadcast = false}) { + return _MultiStream(onListen, isBroadcast); + } + /** * Creates a stream that repeatedly emits events at [period] intervals. * @@ -2230,3 +2301,49 @@ class _ControllerEventSinkWrapper implements EventSink { _ensureSink().close(); } } + +/** + * An enhanced stream controller provided by [Stream.multi]. + * + * Acts like a normal asynchronous controller, but also allows + * adding events synchronously. + * As with any synchronous event delivery, the sender should be very careful + * to not deliver events at times when a new listener might not + * be ready to receive them. + * That generally means only delivering events synchronously in response to other + * asynchronous events, because that is a time when an asynchronous event could + * happen. + */ +@Since("2.9") +abstract class MultiStreamController implements StreamController { + /** + * Adds and delivers an event. + * + * Adds an event like [add] and attempts to deliver it immediately. + * Delivery can be delayed if other previously added events are + * still pending delivery, if the subscription is paused, + * or if the subscription isn't listening yet. + */ + void addSync(T value); + + /** + * Adds and delivers an error event. + * + * Adds an error like [addError] and attempts to deliver it immediately. + * Delivery can be delayed if other previously added events are + * still pending delivery, if the subscription is paused, + * or if the subscription isn't listening yet. + */ + void addErrorSync(Object error, [StackTrace? stackTrace]); + + /** + * Closes the controller and delivers a done event. + * + * Closes the controller like [close] and attempts to deliver a "done" + * event immediately. + * Delivery can be delayed if other previously added events are + * still pending delivery, if the subscription is paused, + * or if the subscription isn't listening yet. + */ + void closeSync(); +} diff --git a/sdk/lib/async/stream_controller.dart b/sdk/lib/async/stream_controller.dart index 9688dd17ab8..0dc3c9751ff 100644 --- a/sdk/lib/async/stream_controller.dart +++ b/sdk/lib/async/stream_controller.dart @@ -628,7 +628,7 @@ abstract class _StreamController implements _StreamControllerBase { * Send or enqueue an error event. */ void addError(Object error, [StackTrace? stackTrace]) { - // TODO(40614): Remove once non-nullability is sound. + // TODO(40614): Remove once non-nullability is sound. Use checkNotNullable. ArgumentError.checkNotNull(error, "error"); if (!_mayAddEvent) throw _badEventState(); AsyncError? replacement = Zone.current.errorCallback(error, stackTrace); @@ -804,9 +804,6 @@ abstract class _StreamController implements _StreamControllerBase { abstract class _SyncStreamControllerDispatch implements _StreamController, SynchronousStreamController { - int get _state; - void set _state(int state); - void _sendData(T data) { _subscription._add(data); } diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart index 1c6fa50c7f7..942f4a7455d 100644 --- a/sdk/lib/async/stream_impl.dart +++ b/sdk/lib/async/stream_impl.dart @@ -1097,3 +1097,43 @@ class _EmptyStream extends Stream { return new _DoneStreamSubscription(onDone); } } + +/** A stream which creates a new controller for each listener. */ +class _MultiStream extends Stream { + final bool isBroadcast; + /** The callback called for each listen. */ + final void Function(MultiStreamController) _onListen; + + _MultiStream(this._onListen, this.isBroadcast); + + StreamSubscription listen(void onData(T event)?, + {Function? onError, void onDone()?, bool? cancelOnError}) { + var controller = _MultiStreamController(); + controller.onListen = () { + _onListen(controller); + }; + return controller._subscribe( + onData, onError, onDone, cancelOnError ?? false); + } +} + +class _MultiStreamController extends _AsyncStreamController + implements MultiStreamController { + _MultiStreamController() : super(null, null, null, null); + + void addSync(T data) { + _subscription._add(data); + } + + void addErrorSync(Object error, [StackTrace? stackTrace]) { + _subscription._addError(error, stackTrace ?? StackTrace.empty); + } + + void closeSync() { + _subscription._close(); + } + + Stream get stream { + throw UnsupportedError("Not available"); + } +} diff --git a/tests/lib/async/stream_multi_test.dart b/tests/lib/async/stream_multi_test.dart new file mode 100644 index 00000000000..e57c4f4ddc7 --- /dev/null +++ b/tests/lib/async/stream_multi_test.dart @@ -0,0 +1,140 @@ +// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import "package:expect/expect.dart"; +import "package:async_helper/async_helper.dart"; + +extension StreamRepeatLatestExtension on Stream { + Stream repeatLatest() { + var done = false; + T? latest = null; + var currentListeners = >{}; + this.listen((event) { + latest = event; + for (var listener in [...currentListeners]) listener.addSync(event); + }, onError: (Object error, StackTrace stack) { + for (var listener in [...currentListeners]) + listener.addErrorSync(error, stack); + }, onDone: () { + done = true; + latest = null; + for (var listener in currentListeners) listener.closeSync(); + currentListeners.clear(); + }); + return Stream.multi((controller) { + if (done) { + controller.close(); + return; + } + currentListeners.add(controller); + var latestValue = latest; + if (latestValue != null) controller.add(latestValue); + controller.onCancel = () { + currentListeners.remove(controller); + }; + }); + } +} + +void main() { + asyncStart(); + testStreamsIndependent(); + asyncTest(testStreamNonOverlap); + asyncTest(testRepeatLatest); + asyncEnd(); +} + +/// Test that the streams can provide different events. +void testStreamsIndependent() { + var log = []; + var index = 0; + var multi = Stream>.multi((c) { + var id = ++index; + log.add("$id"); + for (var i = 0; i < id + 1; i++) { + c.add([id, i]); + } + c.close(); + }); + void logList(List l) { + log.add("${l.first}-${l.last}"); + } + + asyncStart(); + Future.wait([multi.forEach(logList), multi.forEach(logList)]) + .whenComplete(() { + Expect.equals(7, log.length); + for (var element in ["1", "1-0", "1-1", "2", "2-0", "2-1", "2-2"]) { + Expect.isTrue(log.contains(element)); + } + asyncEnd(); + }); +} + +/// Test that stream can be listened to again after having no listener. +Future testStreamNonOverlap() async { + var completer = Completer(); + MultiStreamController? controller; + var stream = Stream.multi((c) { + controller = c; + c.onCancel = () { + controller = null; + if (!completer.isCompleted) completer.complete(null); + }; + }); + for (var i in [1, 2, 3]) { + var log = []; + var subscription = stream.listen((v) { + log.add(v); + if (!completer.isCompleted) completer.complete(v); + }, onError: (e, s) { + log.add(e); + if (!completer.isCompleted) completer.complete(e); + }, onDone: () { + log.add(null); + if (!completer.isCompleted) completer.complete(null); + }); + Expect.isNotNull(controller); + controller!.add(1); + await completer.future; + Expect.listEquals([1], log); + + completer = Completer(); + controller!.add(2); + await completer.future; + Expect.listEquals([1, 2], log); + + completer = Completer(); + if (i == 2) { + subscription.cancel(); + } else { + controller!.close(); + } + await completer.future; + Expect.listEquals([1, 2, if (i != 2) null], log); + } +} + +/// Test that the [Stream.repeatLatest] example code works as described. +Future testRepeatLatest() async { + var c = StreamController(); + var repStream = c.stream.repeatLatest(); + + var f1 = repStream.first; + c.add(1); + var v1 = await f1; + Expect.equals(1, v1); + + var f2 = repStream.take(2).toList(); + c.add(2); + var l2 = await f2; + Expect.listEquals([1, 2], l2); + + var f3 = repStream.take(2).toList(); + c.add(3); + var l3 = await f3; + Expect.listEquals([2, 3], l3); +} diff --git a/tests/lib_2/async/stream_multi_test.dart b/tests/lib_2/async/stream_multi_test.dart new file mode 100644 index 00000000000..b5dd7e556e1 --- /dev/null +++ b/tests/lib_2/async/stream_multi_test.dart @@ -0,0 +1,139 @@ +// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import "package:expect/expect.dart"; +import "package:async_helper/async_helper.dart"; + +extension StreamRepeatLatestExtension on Stream { + Stream repeatLatest() { + var done = false; + T latest = null; + var currentListeners = >{}; + this.listen((event) { + latest = event; + for (var listener in [...currentListeners]) listener.addSync(event); + }, onError: (Object error, StackTrace stack) { + for (var listener in [...currentListeners]) + listener.addErrorSync(error, stack); + }, onDone: () { + done = true; + latest = null; + for (var listener in currentListeners) listener.closeSync(); + currentListeners.clear(); + }); + return Stream.multi((controller) { + if (done) { + controller.close(); + return; + } + currentListeners.add(controller); + if (latest != null) controller.add(latest); + controller.onCancel = () { + currentListeners.remove(controller); + }; + }); + } +} + +void main() { + asyncStart(); + testStreamsIndependent(); + asyncTest(testStreamNonOverlap); + asyncTest(testRepeatLatest); + asyncEnd(); +} + +/// Test that the streams can provide different events. +void testStreamsIndependent() { + var log = []; + var index = 0; + var multi = Stream>.multi((c) { + var id = ++index; + log.add("$id"); + for (var i = 0; i < id + 1; i++) { + c.add([id, i]); + } + c.close(); + }); + void logList(List l) { + log.add("${l.first}-${l.last}"); + } + + asyncStart(); + Future.wait([multi.forEach(logList), multi.forEach(logList)]) + .whenComplete(() { + Expect.equals(7, log.length); + for (var element in ["1", "1-0", "1-1", "2", "2-0", "2-1", "2-2"]) { + Expect.isTrue(log.contains(element)); + } + asyncEnd(); + }); +} + +/// Test that stream can be listened to again after having no listener. +Future testStreamNonOverlap() async { + var completer = Completer(); + MultiStreamController controller; + var stream = Stream.multi((c) { + controller = c; + c.onCancel = () { + controller = null; + if (!completer.isCompleted) completer.complete(null); + }; + }); + for (var i in [1, 2, 3]) { + var log = []; + var subscription = stream.listen((v) { + log.add(v); + if (!completer.isCompleted) completer.complete(v); + }, onError: (e, s) { + log.add(e); + if (!completer.isCompleted) completer.complete(e); + }, onDone: () { + log.add(null); + if (!completer.isCompleted) completer.complete(null); + }); + Expect.isNotNull(controller); + controller.add(1); + await completer.future; + Expect.listEquals([1], log); + + completer = Completer(); + controller.add(2); + await completer.future; + Expect.listEquals([1, 2], log); + + completer = Completer(); + if (i == 2) { + subscription.cancel(); + } else { + controller.close(); + } + await completer.future; + Expect.listEquals([1, 2, if (i != 2) null], log); + } +} + +/// Test that the [Stream.repeatLatest] example code works as described. +Future testRepeatLatest() async { + var c = StreamController(); + var repStream = c.stream.repeatLatest(); + + var f1 = repStream.first; + c.add(1); + var v1 = await f1; + Expect.equals(1, v1); + + var f2 = repStream.take(2).toList(); + c.add(2); + var l2 = await f2; + Expect.listEquals([1, 2], l2); + + var f3 = repStream.take(2).toList(); + c.add(3); + var l3 = await f3; + Expect.listEquals([2, 3], l3); +}