Add Future.any and Stream.fromFutures.

R=floitsch@google.com

Review URL: https://codereview.chromium.org/1563223002 .
This commit is contained in:
Lasse R.H. Nielsen 2016-01-08 10:17:34 +01:00
parent 7c5e03435b
commit 8487d57e48
5 changed files with 273 additions and 0 deletions

View file

@ -1,6 +1,10 @@
## 1.14.0
### Core library changes
* `dart:async`
* Added `Future.any` static method.
* Added `Stream.fromFutures` constructor.
* `dart:convert`
* `Base64Decoder.convert` now takes optional `start` and `end` parameters.

View file

@ -316,6 +316,31 @@ abstract class Future<T> {
return result;
}
/**
* Returns the result of the first future in [futures] to complete.
*
* The returned future is completed with the result of the first
* future in [futures] to report that it is complete.
* The results of all the other futures are discarded.
*
* If [futures] is empty, or if none of its futures complete,
* the returned future never completes.
*/
static Future/*<T>*/ any/*<T>*/(Iterable<Future/*<T>*/> futures) {
var completer = new Completer.sync();
var onValue = (value) {
if (!completer.isCompleted) completer.complete(value);
};
var onError = (error, stack) {
if (!completer.isCompleted) completer.completeError(error, stack);
};
for (var future in futures) {
future.then(onValue, onError: onError);
}
return completer.future;
}
/**
* Perform an async operation for each element of the iterable, in turn.
*

View file

@ -112,6 +112,47 @@ abstract class Stream<T> {
return controller.stream;
}
/**
* Create a stream from a group of futures.
*
* The stream reports the results of the futures on the stream in the order
* in which the futures complete.
*
* If some futures have completed before calling `Stream.fromFutures`,
* their result will be output on the created stream in some unspecified
* order.
*
* When all futures have completed, the stream is closed.
*
* If no future is passed, the stream closes as soon as possible.
*/
factory Stream.fromFutures(Iterable<Future<T>> futures) {
var controller = new StreamController<T>(sync: true);
int count = 0;
var onValue = (value) {
if (!controller.isClosed) {
controller._add(value);
if (--count == 0) controller._closeUnchecked();
}
};
var onError = (error, stack) {
if (!controller.isClosed) {
controller._addError(error, stack);
if (--count == 0) controller._closeUnchecked();
}
};
// The futures are already running, so start listening to them immediately
// (instead of waiting for the stream to be listened on).
// If we wait, we might not catch errors in the futures in time.
for (var future in futures) {
count++;
future.then(onValue, onError: onError);
}
// Use schedule microtask since controller is sync.
if (count == 0) scheduleMicrotask(controller.close);
return controller.stream;
}
/**
* Creates a single-subscription stream that gets its data from [data].
*

View file

@ -926,6 +926,74 @@ void testTypes() {
testType("Future.error", new Future<int>.error("ERR")..catchError((_){}));
}
void testAnyValue() {
asyncStart();
var cs = new List.generate(3, (_) => new Completer());
var result = Future.any(cs.map((x) => x.future));
result.then((v) {
Expect.equals(42, v);
asyncEnd();
}, onError: (e, s) {
Expect.fail("Unexpected error: $e");
});
cs[1].complete(42);
cs[2].complete(10);
cs[0].complete(20);
}
void testAnyError() {
asyncStart();
var cs = new List.generate(3, (_) => new Completer());
var result = Future.any(cs.map((x) => x.future));
result.then((v) {
Expect.fail("Unexpected value: $v");
}, onError: (e, s) {
Expect.equals(42, e);
asyncEnd();
});
cs[1].completeError(42);
cs[2].complete(10);
cs[0].complete(20);
}
void testAnyIgnoreIncomplete() {
asyncStart();
var cs = new List.generate(3, (_) => new Completer());
var result = Future.any(cs.map((x) => x.future));
result.then((v) {
Expect.equals(42, v);
asyncEnd();
}, onError: (e, s) {
Expect.fail("Unexpected error: $e");
});
cs[1].complete(42);
// The other two futures never complete.
}
void testAnyIgnoreError() {
asyncStart();
var cs = new List.generate(3, (_) => new Completer());
var result = Future.any(cs.map((x) => x.future));
result.then((v) {
Expect.equals(42, v);
asyncEnd();
}, onError: (e, s) {
Expect.fail("Unexpected error: $e");
});
cs[1].complete(42);
// The errors are ignored, not uncaught.
cs[2].completeError("BAD");
cs[0].completeError("BAD");
}
main() {
asyncStart();
@ -989,6 +1057,11 @@ main() {
testTypes();
testAnyValue();
testAnyError();
testAnyIgnoreIncomplete();
testAnyIgnoreError();
asyncEnd();
}

View file

@ -0,0 +1,130 @@
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import "dart:async";
import "package:expect/expect.dart";
import 'package:async_helper/async_helper.dart';
main() {
asyncStart();
testValues();
testErrors();
testMixed();
testOrdering();
testEmpty();
testPrecompleted();
asyncEnd();
}
void testValues() {
asyncStart();
var cs = new List.generate(3, (_) => new Completer());
var stream = new Stream.fromFutures(cs.map((x) => x.future));
var result = stream.toList();
result.then((list) {
Expect.listEquals([1, 2, 3], list);
asyncEnd();
});
cs[1].complete(1);
cs[2].complete(2);
cs[0].complete(3);
}
void testErrors() {
asyncStart();
var cs = new List.generate(3, (_) => new Completer());
var stream = new Stream.fromFutures(cs.map((x) => x.future));
int counter = 0;
stream.listen((_) {
Expect.fail("unexpected value");
}, onError: (e) {
Expect.equals(++counter, e);
}, onDone: () {
Expect.equals(3, counter);
asyncEnd();
});
cs[1].completeError(1);
cs[2].completeError(2);
cs[0].completeError(3);
}
void testMixed() {
asyncStart();
var cs = new List.generate(3, (_) => new Completer());
var stream = new Stream.fromFutures(cs.map((x) => x.future));
int counter = 0;
stream.listen((v) {
Expect.isTrue(counter == 0 || counter == 2);
Expect.equals(++counter, v);
}, onError: (e) {
Expect.equals(++counter, 2);
Expect.equals(2, e);
}, onDone: () {
Expect.equals(3, counter);
asyncEnd();
});
cs[1].complete(1);
cs[2].completeError(2);
cs[0].complete(3);
}
void testOrdering() {
// The output is in completion order, not affected by the input future order.
test(n1, n2, n3) {
asyncStart();
var cs = new List.generate(3, (_) => new Completer());
var stream = new Stream.fromFutures(cs.map((x) => x.future));
var result = stream.toList();
result.then((list) {
Expect.listEquals([1, 2, 3], list);
asyncEnd();
});
cs[n1].complete(1);
cs[n2].complete(2);
cs[n3].complete(3);
}
test(0, 1, 2);
test(0, 2, 1);
test(1, 0, 2);
test(1, 2, 0);
test(2, 0, 1);
test(2, 1, 0);
}
void testEmpty() {
asyncStart();
var stream = new Stream.fromFutures([]);
stream.listen((_) {
Expect.fail("unexpected value");
}, onError: (e) {
Expect.fail("unexpected error");
}, onDone: () {
asyncEnd();
});
}
void testPrecompleted() {
asyncStart();
var stream = new Stream.fromFutures(
new Iterable.generate(3, (v) => new Future.value(v + 1)));
var expected = new Set.from([1, 2, 3]);
stream.listen((v) {
Expect.isTrue(expected.contains(v));
expected.remove(v);
}, onDone: () {
Expect.isTrue(expected.isEmpty);
asyncEnd();
});
}