1
0
mirror of https://github.com/dart-lang/sdk synced 2024-07-05 09:20:04 +00:00

Add groupBy to Stream.

R=floitsch@google.com

Review-Url: https://codereview.chromium.org/2850393003 .
This commit is contained in:
Lasse R.H. Nielsen 2017-05-08 10:25:06 +02:00
parent 0adb70eb52
commit 3e8bfb1adb
3 changed files with 425 additions and 0 deletions

View File

@ -28,6 +28,9 @@ entirely to allow inference to fill in the type.
* JSON maps are now typed as `Map<String, dynamic>` instead of
`Map<dynamic, dynamic>`. A JSON-map is not a `HashMap` or `LinkedHashMap`
anymore (but just a `Map`).
* `dart:async`
* Add `groupBy` to `Stream`. Allows splitting a string into separate streams
depending on "key" property computed from the individual events.
### Dart VM

View File

@ -386,6 +386,76 @@ abstract class Stream<T> {
return new _MapStream<T, S>(this, convert);
}
/// Groups events by a computed key.
///
/// A key is extracted from incoming events.
/// The first time a key is seen, a stream is created for it, and emitted
/// on the returned stream, along with the key, as a [StreamGroup] object.
/// Then the event is emitted on the stream ([StreamGroup.values])
/// corresponding to the key.
///
/// An error on the source stream, or when calling the `key` functions,
/// will emit the error on the returned stream.
///
/// Canceling the subscription on the returned stream will stop processing
/// and close the streams for all groups.
///
/// Pausing the subscription on the returned stream will pause processing
/// and no further events are added to streams for the individual groups.
///
/// Pausing or canceling an individual group stream has no effect other than
/// on that stream. Events will be queued while the group stream
/// is paused and until it is first listened to.
/// If the [StreamGroup.values] stream is never listened to,
/// it will enqueue all the events unnecessarily.
Stream<StreamGroup<K, T>> groupBy<K>(K key(T event)) {
var controller;
controller = new StreamController<StreamGroup<K, T>>(
sync: true,
onListen: () {
var groupControllers = new HashMap<K, StreamController<T>>();
void closeAll() {
for (var groupController in groupControllers.values) {
groupController.close();
}
}
var subscription = this.listen(
(data) {
K theKey;
try {
theKey = key(data);
} catch (error, stackTrace) {
controller.addError(error, stackTrace);
return;
}
var groupController = groupControllers[theKey];
if (groupController == null) {
groupController =
new StreamController<T>.broadcast(sync: true);
groupControllers[theKey] = groupController;
controller.add(
new StreamGroup<K, T>(theKey, groupController.stream));
}
groupController.add(data);
},
onError: controller.addError,
onDone: () {
controller.close();
closeAll();
});
controller.onPause = subscription.pause;
controller.onResume = subscription.resume;
controller.onCancel = () {
subscription.cancel();
// Don't fire sync events in response to a callback.
scheduleMicrotask(closeAll);
};
});
return controller.stream;
}
/**
* Creates a new stream with each data event of this stream asynchronously
* mapped to a new event.
@ -1796,3 +1866,36 @@ class _ControllerEventSinkWrapper<T> implements EventSink<T> {
_sink.close();
}
}
/// A group created by [Stream.groupBy] or [Stream.groupByMapped].
///
/// The stream created by `groupBy` emits a `StreamGroup` for each distinct key
/// it encounters.
/// This group contains the [key] itself, along with a stream of the [values]
/// associated with that key.
class StreamGroup<K, V> {
/// The key that identifiers the values emitted by [values].
final K key;
/// The [values] that [GroupBy] have grouped by the common [key].
final Stream<V> values;
factory StreamGroup(K key, Stream<V> values) = StreamGroup<K, V>._;
// Don't expose a generative constructor.
// This class is not intended for subclassing, so we don't want to promise
// it. We can change that in the future.
StreamGroup._(this.key, this.values);
/// Tells [values] to discard values instead of retaining them.
///
/// Must only be used instead of listening to the [values] stream.
/// If the stream has been listened to, this call fails.
/// After calling this method, listening on the [values] stream fails.
Future cancel() {
// If values has been listened to,
// this throws a StateError saying that stream has already been listened to,
// which is a correct error message for this call too.
return values.listen(null).cancel();
}
}

View File

@ -0,0 +1,319 @@
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library stream_group_by_test;
import "dart:async";
import "package:expect/expect.dart";
import "package:async_helper/async_helper.dart";
int len(x) => x.length;
String wrap(x) => "[$x]";
void main() {
asyncStart();
// groupBy.
test("splits", () async {
var grouped = stringStream.groupBy<int>(len);
var byLength = <int, Future<List<String>>>{};
await for (StreamGroup<int, String> group in grouped) {
byLength[group.key] = group.values.toList();
}
Expect.listEquals([1, 2, 4, 3], byLength.keys.toList());
expectCompletes(byLength[1], ["a", "b"]);
expectCompletes(byLength[2], ["ab"]);
expectCompletes(byLength[3], ["abe", "lea"]);
expectCompletes(byLength[4], ["abel", "bell", "able", "abba"]);
});
test("empty", () async {
var grouped = emptyStream.groupBy<int>(len);
var byLength = <int, Future<List<String>>>{};
await for (StreamGroup<int, String> group in grouped) {
byLength[group.key] = group.values.toList();
}
Expect.isTrue(byLength.isEmpty);
});
test("single group", () async {
var grouped = repeatStream(5, "x").groupBy<int>(len);
var byLength = <int, Future<List<String>>>{};
await for (StreamGroup<int, String> group in grouped) {
byLength[group.key] = group.values.toList();
}
Expect.listEquals([1], byLength.keys.toList());
expectCompletes(byLength[1], ["x", "x", "x", "x", "x"]);
});
test("with error", () async {
var grouped = stringErrorStream(3).groupBy<int>(len);
var byLength = <int, Future<List<String>>>{};
bool caught = false;
try {
await for (StreamGroup<int, String> group in grouped) {
byLength[group.key] = group.values.toList();
}
} catch (e) {
Expect.equals("BAD", e);
caught = true;
}
Expect.isTrue(caught);
Expect.listEquals([1, 2, 4], byLength.keys.toList());
expectCompletes(byLength[1], ["a", "b"]);
expectCompletes(byLength[2], ["ab"]);
expectCompletes(byLength[4], ["abel"]);
});
// For comparison with later tests.
test("no pause or cancel", () async {
var grouped = stringStream.groupBy<int>(len);
var events = [];
var futures = [];
await grouped.forEach((sg) {
var key = sg.key;
var sub;
sub = sg.values.listen((value) {
events.add("$key:$value");
});
var c = new Completer();
futures.add(c.future);
sub.onDone(() {
c.complete(null);
});
});
await Future.wait(futures);
Expect.listEquals([
"1:a",
"2:ab",
"1:b",
"4:abel",
"3:abe",
"4:bell",
"4:able",
"4:abba",
"3:lea",
], events);
});
test("pause on group", () async {
// Pausing the individial group's stream just makes it buffer.
var grouped = stringStream.groupBy<int>(len);
var events = [];
var futures = [];
await grouped.forEach((sg) {
var key = sg.key;
var sub;
sub = sg.values.listen((value) {
events.add("$key:$value");
if (value == "a") {
// Pause until a later timer event, which is after stringStream
// has delivered all events.
sub.pause(new Future.delayed(Duration.ZERO, () {}));
}
});
var c = new Completer();
futures.add(c.future);
sub.onDone(() {
c.complete(null);
});
});
await Future.wait(futures);
Expect.listEquals([
"1:a",
"2:ab",
"4:abel",
"3:abe",
"4:bell",
"4:able",
"4:abba",
"3:lea",
"1:b"
], events);
});
test("pause on group-stream", () async {
// Pausing the stream returned by groupBy stops everything.
var grouped = stringStream.groupBy<int>(len);
var events = [];
var futures = [];
var done = new Completer();
var sub;
sub = grouped.listen((sg) {
var key = sg.key;
futures.add(sg.values.forEach((value) {
events.add("$key:$value");
if (value == "a") {
// Pause everything until a later timer event.
asyncStart();
var eventSnapshot = events.toList();
var delay = new Future.delayed(Duration.ZERO).then((_) {
// No events added.
Expect.listEquals(eventSnapshot, events);
asyncEnd(); // Ensures this test has run.
});
sub.pause(delay);
}
}));
});
sub.onDone(() {
done.complete(null);
});
futures.add(done.future);
await Future.wait(futures);
Expect.listEquals([
"1:a",
"2:ab",
"1:b",
"4:abel",
"3:abe",
"4:bell",
"4:able",
"4:abba",
"3:lea",
], events);
});
test("cancel on group", () async {
// Cancelling the individial group's stream just makes that one stop.
var grouped = stringStream.groupBy<int>(len);
var events = [];
var futures = [];
await grouped.forEach((sg) {
var key = sg.key;
var sub;
var c = new Completer();
sub = sg.values.listen((value) {
events.add("$key:$value");
if (value == "bell") {
// Pause until a later timer event, which is after stringStream
// has delivered all events.
sub.cancel();
c.complete(null);
}
});
futures.add(c.future);
sub.onDone(() {
c.complete(null);
});
});
await Future.wait(futures);
Expect.listEquals([
"1:a",
"2:ab",
"1:b",
"4:abel",
"3:abe",
"4:bell",
"3:lea",
], events);
});
test("cancel on group-stream", () async {
// Cancel the stream returned by groupBy ends everything.
var grouped = stringStream.groupBy<int>(len);
var events = [];
var futures = [];
var done = new Completer();
var sub;
sub = grouped.listen((sg) {
var key = sg.key;
futures.add(sg.values.forEach((value) {
events.add("$key:$value");
if (value == "bell") {
// Pause everything until a later timer event.
futures.add(sub.cancel());
done.complete();
}
}));
});
futures.add(done.future);
await Future.wait(futures);
Expect.listEquals([
"1:a",
"2:ab",
"1:b",
"4:abel",
"3:abe",
"4:bell",
], events);
});
asyncEnd();
}
expectCompletes(future, result) {
asyncStart();
future.then((v) {
if (result is List) {
Expect.listEquals(result, v);
} else {
Expect.equals(v, result);
}
asyncEnd();
}, onError: (e, s) {
Expect.fail("$e\n$s");
});
}
void test(name, func) {
asyncStart();
func().then((_) {
asyncEnd();
}, onError: (e, s) {
Expect.fail("$name: $e\n$s");
});
}
var strings = const [
"a",
"ab",
"b",
"abel",
"abe",
"bell",
"able",
"abba",
"lea"
];
Stream<String> get stringStream async* {
for (var string in strings) {
yield string;
}
}
Stream get emptyStream async* {}
Stream repeatStream(int count, value) async* {
for (var i = 0; i < count; i++) {
yield value;
}
}
// Just some valid stack trace.
var stack = StackTrace.current;
Stream<String> stringErrorStream(int errorAfter) async* {
for (int i = 0; i < strings.length; i++) {
yield strings[i];
if (i == errorAfter) {
// Emit error, but continue afterwards.
yield* new Future.error("BAD", stack).asStream();
}
}
}
Stream intStream(int count, [int start = 0]) async* {
for (int i = 0; i < count; i++) {
yield start++;
}
}
Stream timerStream(int count, Duration interval) async* {
for (int i = 0; i < count; i++) {
await new Future.delayed(interval);
yield i;
}
}