1
0
mirror of https://github.com/dart-lang/sdk synced 2024-07-03 00:08:46 +00:00

Add Stream.multi constructor.

A generalized stream which provides a controller for each listener.
Can be used to implement both broadcast streams and single subscription streams,
as well as any stream behavior between the two.

Change-Id: I7a75f8736ca6bc91ce266e768db68536efd24dfe
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/150936
Commit-Queue: Lasse R.H. Nielsen <lrn@google.com>
Reviewed-by: Nate Bosch <nbosch@google.com>
This commit is contained in:
Lasse Reichstein Holst Nielsen 2020-06-29 14:04:53 +00:00 committed by commit-bot@chromium.org
parent ff114a9828
commit 17d0ba55e8
7 changed files with 459 additions and 24 deletions

View File

@ -4,12 +4,11 @@
### Core libraries
#### `dart:io`
#### `dart:async`
* [#42006][]: The signature of `exit` has been changed to return the
`Never`type instead of `void`. since no code will run after it,
[#42006]: https://github.com/dart-lang/sdk/issues/42006
* Adds `Stream.multi` constructor creating streams which can be
listened to more than once, and where each individual listener
can be controlled independently.
#### `dart:convert`
@ -23,6 +22,20 @@
[#41100]: https://github.com/dart-lang/sdk/issues/41100
[WHATWG encoding standard]: https://encoding.spec.whatwg.org/#utf-8-decoder
#### `dart:io`
* [#42006][]: The signature of `exit` has been changed to return the
`Never`type instead of `void`. since no code will run after it,
* Class `OSError` now implements `Exception`. This change means `OSError` will
now be caught in catch clauses catching `Exception`s.
* Added `InternetAddress.tryParse`.
* [Abstract Unix Domain Socket][] is supported on Linux/Android now. Using an
`InternetAddress` with `address` starting with '@' and type being
`InternetAddressType.Unix` will create an abstract Unix Domain Socket.
[#42006]: https://github.com/dart-lang/sdk/issues/42006
[Abstract Unix Domain Socket]: http://man7.org/linux/man-pages/man7/unix.7.html
#### `dart:html`
* **Breaking Change**: `CssClassSet.add()` previously returned `null` if the
@ -37,21 +50,9 @@
`Future` is empty instead, it completes asynchronously, therefore
potentially invalidating code that relied on the synchronous side-effect.
This change will only affect code using sound null-safety. See issue
[41653][] for more details.
[#41653][] for more details.
[41653]: https://github.com/dart-lang/sdk/issues/41653
#### `dart:io`
* Class `OSError` now implements `Exception`. This change means `OSError` will
now be caught in catch clauses catching `Exception`s.
* Added `InternetAddress.tryParse`.
* [Abstract Unix Domain Socket][] is supported on Linux/Android now. Using an
`InternetAddress` with `address` starting with '@' and type being
`InternetAddressType.Unix` will create an abstract Unix Domain Socket.
[Abstract Unix Domain Socket]: http://man7.org/linux/man-pages/man7/unix.7.html
[#41653]: https://github.com/dart-lang/sdk/issues/41653
### Tools
@ -100,7 +101,7 @@ Updated the Linter to `0.1.117`, which includes:
* Warn at publishing first time a package version opts in to null-safety.
* Preserve Windows line endings in pubspec.lock if they are already there (#2489)
* Better terminal color-detection. Use colors in terminals on Windows.
* `pub outdated`: If the current version of a dependency is a prerelease
* `pub outdated`: If the current version of a dependency is a prerelease
version, use prereleases for latest if no newer stable.
* `pub outdated` now works without a lockfile. In that case the 'Current'
column will be empty.

View File

@ -98,6 +98,7 @@ import "dart:_internal"
show
CastStream,
CastStreamTransformer,
checkNotNullable,
EmptyIterator,
IterableElementError,
printToZone,

View File

@ -247,6 +247,77 @@ abstract class Stream<T> {
() => new _IterablePendingEvents<T>(elements));
}
/**
* Creates a multi-subscription stream.
*
* Each time the created stream is listened to,
* the [onListen] callback is invoked with a new [MultiStreamController]
* which forwards events to the [StreamSubscription]
* returned by that [listen] call.
*
* This allows each listener to be treated as an individual stream.
*
* The [MultiStreamController] does not support reading its
* [StreamController.stream]. Setting its [StreamController.onListen]
* has no effect since the [onListen] callback is called instead,
* and the [StreamController.onListen] won't be called later.
* The controller acts like an asynchronous controller,
* but provides extra methods for delivering events synchronously.
*
* If [isBroadcast] is set to `true`, the returned stream's
* [Stream.isBroadcast] will be `true`.
* This has no effect on the stream behavior,
* it is up to the [onListen] function
* to act like a broadcast stream if it claims to be one.
*
* A multi-subscription stream can behave like any other stream.
* If the [onListen] callback throws on every call after the first,
* the stream behaves like a single-subscription stream.
* If the stream emits the same events to all current listeners,
* it behaves like a broadcast stream.
*
* It can also choose to emit different events to different listeners.
* For example, a stream which repeats the most recent
* non-`null` event to new listeners, could be implemented as this example:
* ```dart
* extension StreamRepeatLatestExtension<T extends Object> on Stream<T> {
* Stream<T> repeatLatest() {
* var done = false;
* T? latest = null;
* var currentListeners = <MultiStreamController<T>>{};
* this.listen((event) {
* latest = event;
* for (var listener in [...currentListeners]) listener.addSync(event);
* }, onError: (Object error, StackTrace stack) {
* for (var listener in [...currentListeners]) listener.addErrorSync(error, stack);
* }, onDone: () {
* done = true;
* latest = null;
* for (var listener in currentListeners) listener.closeSync();
* currentListeners.clear();
* });
* return Stream.multi((controller) {
* if (done) {
* controller.close();
* return;
* }
* currentListeners.add(controller);
* var latestValue = latest;
* if (latestValue != null) controller.add(latestValue);
* controller.onCancel = () {
* currentListeners.remove(controller);
* };
* });
* }
* }
* ```
*/
@Since("2.9")
factory Stream.multi(void Function(MultiStreamController<T>) onListen,
{bool isBroadcast = false}) {
return _MultiStream<T>(onListen, isBroadcast);
}
/**
* Creates a stream that repeatedly emits events at [period] intervals.
*
@ -2230,3 +2301,49 @@ class _ControllerEventSinkWrapper<T> implements EventSink<T> {
_ensureSink().close();
}
}
/**
* An enhanced stream controller provided by [Stream.multi].
*
* Acts like a normal asynchronous controller, but also allows
* adding events synchronously.
* As with any synchronous event delivery, the sender should be very careful
* to not deliver events at times when a new listener might not
* be ready to receive them.
* That generally means only delivering events synchronously in response to other
* asynchronous events, because that is a time when an asynchronous event could
* happen.
*/
@Since("2.9")
abstract class MultiStreamController<T> implements StreamController<T> {
/**
* Adds and delivers an event.
*
* Adds an event like [add] and attempts to deliver it immediately.
* Delivery can be delayed if other previously added events are
* still pending delivery, if the subscription is paused,
* or if the subscription isn't listening yet.
*/
void addSync(T value);
/**
* Adds and delivers an error event.
*
* Adds an error like [addError] and attempts to deliver it immediately.
* Delivery can be delayed if other previously added events are
* still pending delivery, if the subscription is paused,
* or if the subscription isn't listening yet.
*/
void addErrorSync(Object error, [StackTrace? stackTrace]);
/**
* Closes the controller and delivers a done event.
*
* Closes the controller like [close] and attempts to deliver a "done"
* event immediately.
* Delivery can be delayed if other previously added events are
* still pending delivery, if the subscription is paused,
* or if the subscription isn't listening yet.
*/
void closeSync();
}

View File

@ -628,7 +628,7 @@ abstract class _StreamController<T> implements _StreamControllerBase<T> {
* Send or enqueue an error event.
*/
void addError(Object error, [StackTrace? stackTrace]) {
// TODO(40614): Remove once non-nullability is sound.
// TODO(40614): Remove once non-nullability is sound. Use checkNotNullable.
ArgumentError.checkNotNull(error, "error");
if (!_mayAddEvent) throw _badEventState();
AsyncError? replacement = Zone.current.errorCallback(error, stackTrace);
@ -804,9 +804,6 @@ abstract class _StreamController<T> implements _StreamControllerBase<T> {
abstract class _SyncStreamControllerDispatch<T>
implements _StreamController<T>, SynchronousStreamController<T> {
int get _state;
void set _state(int state);
void _sendData(T data) {
_subscription._add(data);
}

View File

@ -1097,3 +1097,43 @@ class _EmptyStream<T> extends Stream<T> {
return new _DoneStreamSubscription<T>(onDone);
}
}
/** A stream which creates a new controller for each listener. */
class _MultiStream<T> extends Stream<T> {
final bool isBroadcast;
/** The callback called for each listen. */
final void Function(MultiStreamController<T>) _onListen;
_MultiStream(this._onListen, this.isBroadcast);
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError}) {
var controller = _MultiStreamController<T>();
controller.onListen = () {
_onListen(controller);
};
return controller._subscribe(
onData, onError, onDone, cancelOnError ?? false);
}
}
class _MultiStreamController<T> extends _AsyncStreamController<T>
implements MultiStreamController<T> {
_MultiStreamController() : super(null, null, null, null);
void addSync(T data) {
_subscription._add(data);
}
void addErrorSync(Object error, [StackTrace? stackTrace]) {
_subscription._addError(error, stackTrace ?? StackTrace.empty);
}
void closeSync() {
_subscription._close();
}
Stream<T> get stream {
throw UnsupportedError("Not available");
}
}

View File

@ -0,0 +1,140 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import "package:expect/expect.dart";
import "package:async_helper/async_helper.dart";
extension StreamRepeatLatestExtension<T extends Object> on Stream<T> {
Stream<T> repeatLatest() {
var done = false;
T? latest = null;
var currentListeners = <MultiStreamController<T>>{};
this.listen((event) {
latest = event;
for (var listener in [...currentListeners]) listener.addSync(event);
}, onError: (Object error, StackTrace stack) {
for (var listener in [...currentListeners])
listener.addErrorSync(error, stack);
}, onDone: () {
done = true;
latest = null;
for (var listener in currentListeners) listener.closeSync();
currentListeners.clear();
});
return Stream.multi((controller) {
if (done) {
controller.close();
return;
}
currentListeners.add(controller);
var latestValue = latest;
if (latestValue != null) controller.add(latestValue);
controller.onCancel = () {
currentListeners.remove(controller);
};
});
}
}
void main() {
asyncStart();
testStreamsIndependent();
asyncTest(testStreamNonOverlap);
asyncTest(testRepeatLatest);
asyncEnd();
}
/// Test that the streams can provide different events.
void testStreamsIndependent() {
var log = <String>[];
var index = 0;
var multi = Stream<List<int>>.multi((c) {
var id = ++index;
log.add("$id");
for (var i = 0; i < id + 1; i++) {
c.add([id, i]);
}
c.close();
});
void logList(List<int> l) {
log.add("${l.first}-${l.last}");
}
asyncStart();
Future.wait([multi.forEach(logList), multi.forEach(logList)])
.whenComplete(() {
Expect.equals(7, log.length);
for (var element in ["1", "1-0", "1-1", "2", "2-0", "2-1", "2-2"]) {
Expect.isTrue(log.contains(element));
}
asyncEnd();
});
}
/// Test that stream can be listened to again after having no listener.
Future<void> testStreamNonOverlap() async {
var completer = Completer<Object?>();
MultiStreamController<int>? controller;
var stream = Stream<int>.multi((c) {
controller = c;
c.onCancel = () {
controller = null;
if (!completer.isCompleted) completer.complete(null);
};
});
for (var i in [1, 2, 3]) {
var log = <Object?>[];
var subscription = stream.listen((v) {
log.add(v);
if (!completer.isCompleted) completer.complete(v);
}, onError: (e, s) {
log.add(e);
if (!completer.isCompleted) completer.complete(e);
}, onDone: () {
log.add(null);
if (!completer.isCompleted) completer.complete(null);
});
Expect.isNotNull(controller);
controller!.add(1);
await completer.future;
Expect.listEquals([1], log);
completer = Completer();
controller!.add(2);
await completer.future;
Expect.listEquals([1, 2], log);
completer = Completer();
if (i == 2) {
subscription.cancel();
} else {
controller!.close();
}
await completer.future;
Expect.listEquals([1, 2, if (i != 2) null], log);
}
}
/// Test that the [Stream.repeatLatest] example code works as described.
Future<void> testRepeatLatest() async {
var c = StreamController<int>();
var repStream = c.stream.repeatLatest();
var f1 = repStream.first;
c.add(1);
var v1 = await f1;
Expect.equals(1, v1);
var f2 = repStream.take(2).toList();
c.add(2);
var l2 = await f2;
Expect.listEquals([1, 2], l2);
var f3 = repStream.take(2).toList();
c.add(3);
var l3 = await f3;
Expect.listEquals([2, 3], l3);
}

View File

@ -0,0 +1,139 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import "package:expect/expect.dart";
import "package:async_helper/async_helper.dart";
extension StreamRepeatLatestExtension<T> on Stream<T> {
Stream<T> repeatLatest() {
var done = false;
T latest = null;
var currentListeners = <MultiStreamController<T>>{};
this.listen((event) {
latest = event;
for (var listener in [...currentListeners]) listener.addSync(event);
}, onError: (Object error, StackTrace stack) {
for (var listener in [...currentListeners])
listener.addErrorSync(error, stack);
}, onDone: () {
done = true;
latest = null;
for (var listener in currentListeners) listener.closeSync();
currentListeners.clear();
});
return Stream.multi((controller) {
if (done) {
controller.close();
return;
}
currentListeners.add(controller);
if (latest != null) controller.add(latest);
controller.onCancel = () {
currentListeners.remove(controller);
};
});
}
}
void main() {
asyncStart();
testStreamsIndependent();
asyncTest(testStreamNonOverlap);
asyncTest(testRepeatLatest);
asyncEnd();
}
/// Test that the streams can provide different events.
void testStreamsIndependent() {
var log = <String>[];
var index = 0;
var multi = Stream<List<int>>.multi((c) {
var id = ++index;
log.add("$id");
for (var i = 0; i < id + 1; i++) {
c.add([id, i]);
}
c.close();
});
void logList(List<int> l) {
log.add("${l.first}-${l.last}");
}
asyncStart();
Future.wait([multi.forEach(logList), multi.forEach(logList)])
.whenComplete(() {
Expect.equals(7, log.length);
for (var element in ["1", "1-0", "1-1", "2", "2-0", "2-1", "2-2"]) {
Expect.isTrue(log.contains(element));
}
asyncEnd();
});
}
/// Test that stream can be listened to again after having no listener.
Future<void> testStreamNonOverlap() async {
var completer = Completer<Object>();
MultiStreamController<int> controller;
var stream = Stream<int>.multi((c) {
controller = c;
c.onCancel = () {
controller = null;
if (!completer.isCompleted) completer.complete(null);
};
});
for (var i in [1, 2, 3]) {
var log = <Object>[];
var subscription = stream.listen((v) {
log.add(v);
if (!completer.isCompleted) completer.complete(v);
}, onError: (e, s) {
log.add(e);
if (!completer.isCompleted) completer.complete(e);
}, onDone: () {
log.add(null);
if (!completer.isCompleted) completer.complete(null);
});
Expect.isNotNull(controller);
controller.add(1);
await completer.future;
Expect.listEquals([1], log);
completer = Completer();
controller.add(2);
await completer.future;
Expect.listEquals([1, 2], log);
completer = Completer();
if (i == 2) {
subscription.cancel();
} else {
controller.close();
}
await completer.future;
Expect.listEquals([1, 2, if (i != 2) null], log);
}
}
/// Test that the [Stream.repeatLatest] example code works as described.
Future<void> testRepeatLatest() async {
var c = StreamController<int>();
var repStream = c.stream.repeatLatest();
var f1 = repStream.first;
c.add(1);
var v1 = await f1;
Expect.equals(1, v1);
var f2 = repStream.take(2).toList();
c.add(2);
var l2 = await f2;
Expect.listEquals([1, 2], l2);
var f3 = repStream.take(2).toList();
c.add(3);
var l3 = await f3;
Expect.listEquals([2, 3], l3);
}