diff --git a/CHANGELOG.md b/CHANGELOG.md index fbf02d27bd4..518f18d0f50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,9 @@ entirely to allow inference to fill in the type. * JSON maps are now typed as `Map` instead of `Map`. A JSON-map is not a `HashMap` or `LinkedHashMap` anymore (but just a `Map`). +* `dart:async` + * Add `groupBy` to `Stream`. Allows splitting a string into separate streams + depending on "key" property computed from the individual events. ### Dart VM diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart index 27c0ff78371..9abe75c54d4 100644 --- a/sdk/lib/async/stream.dart +++ b/sdk/lib/async/stream.dart @@ -386,6 +386,76 @@ abstract class Stream { return new _MapStream(this, convert); } + /// Groups events by a computed key. + /// + /// A key is extracted from incoming events. + /// The first time a key is seen, a stream is created for it, and emitted + /// on the returned stream, along with the key, as a [StreamGroup] object. + /// Then the event is emitted on the stream ([StreamGroup.values]) + /// corresponding to the key. + /// + /// An error on the source stream, or when calling the `key` functions, + /// will emit the error on the returned stream. + /// + /// Canceling the subscription on the returned stream will stop processing + /// and close the streams for all groups. + /// + /// Pausing the subscription on the returned stream will pause processing + /// and no further events are added to streams for the individual groups. + /// + /// Pausing or canceling an individual group stream has no effect other than + /// on that stream. Events will be queued while the group stream + /// is paused and until it is first listened to. + /// If the [StreamGroup.values] stream is never listened to, + /// it will enqueue all the events unnecessarily. + Stream> groupBy(K key(T event)) { + var controller; + controller = new StreamController>( + sync: true, + onListen: () { + var groupControllers = new HashMap>(); + + void closeAll() { + for (var groupController in groupControllers.values) { + groupController.close(); + } + } + + var subscription = this.listen( + (data) { + K theKey; + try { + theKey = key(data); + } catch (error, stackTrace) { + controller.addError(error, stackTrace); + return; + } + var groupController = groupControllers[theKey]; + if (groupController == null) { + groupController = + new StreamController.broadcast(sync: true); + groupControllers[theKey] = groupController; + controller.add( + new StreamGroup(theKey, groupController.stream)); + } + groupController.add(data); + }, + onError: controller.addError, + onDone: () { + controller.close(); + closeAll(); + }); + controller.onPause = subscription.pause; + controller.onResume = subscription.resume; + controller.onCancel = () { + subscription.cancel(); + // Don't fire sync events in response to a callback. + scheduleMicrotask(closeAll); + }; + }); + return controller.stream; + } + /** * Creates a new stream with each data event of this stream asynchronously * mapped to a new event. @@ -1796,3 +1866,36 @@ class _ControllerEventSinkWrapper implements EventSink { _sink.close(); } } + +/// A group created by [Stream.groupBy] or [Stream.groupByMapped]. +/// +/// The stream created by `groupBy` emits a `StreamGroup` for each distinct key +/// it encounters. +/// This group contains the [key] itself, along with a stream of the [values] +/// associated with that key. +class StreamGroup { + /// The key that identifiers the values emitted by [values]. + final K key; + + /// The [values] that [GroupBy] have grouped by the common [key]. + final Stream values; + + factory StreamGroup(K key, Stream values) = StreamGroup._; + + // Don't expose a generative constructor. + // This class is not intended for subclassing, so we don't want to promise + // it. We can change that in the future. + StreamGroup._(this.key, this.values); + + /// Tells [values] to discard values instead of retaining them. + /// + /// Must only be used instead of listening to the [values] stream. + /// If the stream has been listened to, this call fails. + /// After calling this method, listening on the [values] stream fails. + Future cancel() { + // If values has been listened to, + // this throws a StateError saying that stream has already been listened to, + // which is a correct error message for this call too. + return values.listen(null).cancel(); + } +} diff --git a/tests/lib/async/stream_group_by_test.dart b/tests/lib/async/stream_group_by_test.dart new file mode 100644 index 00000000000..edba9e089a9 --- /dev/null +++ b/tests/lib/async/stream_group_by_test.dart @@ -0,0 +1,319 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +library stream_group_by_test; + +import "dart:async"; + +import "package:expect/expect.dart"; +import "package:async_helper/async_helper.dart"; + +int len(x) => x.length; +String wrap(x) => "[$x]"; + +void main() { + asyncStart(); + // groupBy. + test("splits", () async { + var grouped = stringStream.groupBy(len); + var byLength = >>{}; + await for (StreamGroup group in grouped) { + byLength[group.key] = group.values.toList(); + } + Expect.listEquals([1, 2, 4, 3], byLength.keys.toList()); + expectCompletes(byLength[1], ["a", "b"]); + expectCompletes(byLength[2], ["ab"]); + expectCompletes(byLength[3], ["abe", "lea"]); + expectCompletes(byLength[4], ["abel", "bell", "able", "abba"]); + }); + + test("empty", () async { + var grouped = emptyStream.groupBy(len); + var byLength = >>{}; + await for (StreamGroup group in grouped) { + byLength[group.key] = group.values.toList(); + } + Expect.isTrue(byLength.isEmpty); + }); + + test("single group", () async { + var grouped = repeatStream(5, "x").groupBy(len); + var byLength = >>{}; + await for (StreamGroup group in grouped) { + byLength[group.key] = group.values.toList(); + } + Expect.listEquals([1], byLength.keys.toList()); + expectCompletes(byLength[1], ["x", "x", "x", "x", "x"]); + }); + + test("with error", () async { + var grouped = stringErrorStream(3).groupBy(len); + var byLength = >>{}; + bool caught = false; + try { + await for (StreamGroup group in grouped) { + byLength[group.key] = group.values.toList(); + } + } catch (e) { + Expect.equals("BAD", e); + caught = true; + } + Expect.isTrue(caught); + Expect.listEquals([1, 2, 4], byLength.keys.toList()); + expectCompletes(byLength[1], ["a", "b"]); + expectCompletes(byLength[2], ["ab"]); + expectCompletes(byLength[4], ["abel"]); + }); + + // For comparison with later tests. + test("no pause or cancel", () async { + var grouped = stringStream.groupBy(len); + var events = []; + var futures = []; + await grouped.forEach((sg) { + var key = sg.key; + var sub; + sub = sg.values.listen((value) { + events.add("$key:$value"); + }); + var c = new Completer(); + futures.add(c.future); + sub.onDone(() { + c.complete(null); + }); + }); + await Future.wait(futures); + Expect.listEquals([ + "1:a", + "2:ab", + "1:b", + "4:abel", + "3:abe", + "4:bell", + "4:able", + "4:abba", + "3:lea", + ], events); + }); + + test("pause on group", () async { + // Pausing the individial group's stream just makes it buffer. + var grouped = stringStream.groupBy(len); + var events = []; + var futures = []; + await grouped.forEach((sg) { + var key = sg.key; + var sub; + sub = sg.values.listen((value) { + events.add("$key:$value"); + if (value == "a") { + // Pause until a later timer event, which is after stringStream + // has delivered all events. + sub.pause(new Future.delayed(Duration.ZERO, () {})); + } + }); + var c = new Completer(); + futures.add(c.future); + sub.onDone(() { + c.complete(null); + }); + }); + await Future.wait(futures); + Expect.listEquals([ + "1:a", + "2:ab", + "4:abel", + "3:abe", + "4:bell", + "4:able", + "4:abba", + "3:lea", + "1:b" + ], events); + }); + + test("pause on group-stream", () async { + // Pausing the stream returned by groupBy stops everything. + var grouped = stringStream.groupBy(len); + var events = []; + var futures = []; + var done = new Completer(); + var sub; + sub = grouped.listen((sg) { + var key = sg.key; + futures.add(sg.values.forEach((value) { + events.add("$key:$value"); + if (value == "a") { + // Pause everything until a later timer event. + asyncStart(); + var eventSnapshot = events.toList(); + var delay = new Future.delayed(Duration.ZERO).then((_) { + // No events added. + Expect.listEquals(eventSnapshot, events); + asyncEnd(); // Ensures this test has run. + }); + sub.pause(delay); + } + })); + }); + sub.onDone(() { + done.complete(null); + }); + futures.add(done.future); + await Future.wait(futures); + Expect.listEquals([ + "1:a", + "2:ab", + "1:b", + "4:abel", + "3:abe", + "4:bell", + "4:able", + "4:abba", + "3:lea", + ], events); + }); + + test("cancel on group", () async { + // Cancelling the individial group's stream just makes that one stop. + var grouped = stringStream.groupBy(len); + var events = []; + var futures = []; + await grouped.forEach((sg) { + var key = sg.key; + var sub; + var c = new Completer(); + sub = sg.values.listen((value) { + events.add("$key:$value"); + if (value == "bell") { + // Pause until a later timer event, which is after stringStream + // has delivered all events. + sub.cancel(); + c.complete(null); + } + }); + futures.add(c.future); + sub.onDone(() { + c.complete(null); + }); + }); + await Future.wait(futures); + Expect.listEquals([ + "1:a", + "2:ab", + "1:b", + "4:abel", + "3:abe", + "4:bell", + "3:lea", + ], events); + }); + + test("cancel on group-stream", () async { + // Cancel the stream returned by groupBy ends everything. + var grouped = stringStream.groupBy(len); + var events = []; + var futures = []; + var done = new Completer(); + var sub; + sub = grouped.listen((sg) { + var key = sg.key; + futures.add(sg.values.forEach((value) { + events.add("$key:$value"); + if (value == "bell") { + // Pause everything until a later timer event. + futures.add(sub.cancel()); + done.complete(); + } + })); + }); + futures.add(done.future); + await Future.wait(futures); + Expect.listEquals([ + "1:a", + "2:ab", + "1:b", + "4:abel", + "3:abe", + "4:bell", + ], events); + }); + + asyncEnd(); +} + +expectCompletes(future, result) { + asyncStart(); + future.then((v) { + if (result is List) { + Expect.listEquals(result, v); + } else { + Expect.equals(v, result); + } + asyncEnd(); + }, onError: (e, s) { + Expect.fail("$e\n$s"); + }); +} + +void test(name, func) { + asyncStart(); + func().then((_) { + asyncEnd(); + }, onError: (e, s) { + Expect.fail("$name: $e\n$s"); + }); +} + +var strings = const [ + "a", + "ab", + "b", + "abel", + "abe", + "bell", + "able", + "abba", + "lea" +]; + +Stream get stringStream async* { + for (var string in strings) { + yield string; + } +} + +Stream get emptyStream async* {} + +Stream repeatStream(int count, value) async* { + for (var i = 0; i < count; i++) { + yield value; + } +} + +// Just some valid stack trace. +var stack = StackTrace.current; + +Stream stringErrorStream(int errorAfter) async* { + for (int i = 0; i < strings.length; i++) { + yield strings[i]; + if (i == errorAfter) { + // Emit error, but continue afterwards. + yield* new Future.error("BAD", stack).asStream(); + } + } +} + +Stream intStream(int count, [int start = 0]) async* { + for (int i = 0; i < count; i++) { + yield start++; + } +} + +Stream timerStream(int count, Duration interval) async* { + for (int i = 0; i < count; i++) { + await new Future.delayed(interval); + yield i; + } +}