Return futures on Stream.cancel when possible.

Deprecate returning `null`.

Also, fixes cases where transformations on a stream didn't forward the cancel future

Fixes #26777.

BUG= http://dartbug.com/26777.
R=lrn@google.com

Review URL: https://codereview.chromium.org/2202533003 .

Committed: 395e7aaa69
Reverted: 99e5328eac
Committed: 1905ddafaa
Reverted: 46a8579c1e
This commit is contained in:
Florian Loitsch 2016-09-05 17:41:22 +02:00
parent e8a46acb2c
commit 6255638cd0
10 changed files with 553 additions and 259 deletions

View file

@ -13,6 +13,10 @@
* `dart:async`
* `Future.wait` now catches synchronous errors and returns them in the
returned Future.
* More aggressively returns a Future on Stream.cancel operations.
Discourages to return `null` from `cancel`.
* Fixes a few bugs where the cancel future wasn't passed through
transformations.
* `dart:io`
* Added `WebSocket.addUtf8Text` to allow sending a pre-encoded text message
without a round-trip UTF-8 conversion.

View file

@ -232,6 +232,7 @@ abstract class Stream<T> {
onCancel: () {
if (timer != null) timer.cancel();
timer = null;
return Future._nullFuture;
});
return controller.stream;
}
@ -441,7 +442,7 @@ abstract class Stream<T> {
onListen: onListen,
onPause: () { subscription.pause(); },
onResume: () { subscription.resume(); },
onCancel: () { subscription.cancel(); },
onCancel: () => subscription.cancel(),
sync: true
);
}
@ -499,7 +500,7 @@ abstract class Stream<T> {
onListen: onListen,
onPause: () { subscription.pause(); },
onResume: () { subscription.resume(); },
onCancel: () { subscription.cancel(); },
onCancel: () => subscription.cancel(),
sync: true
);
}
@ -1407,7 +1408,10 @@ abstract class StreamSubscription<T> {
* the subscription is canceled.
*
* Returns a future that is completed once the stream has finished
* its cleanup. May also return `null` if no cleanup was necessary.
* its cleanup.
*
* For historical reasons, may also return `null` if no cleanup was necessary.
* Returning `null` is deprecated and should be avoided.
*
* Typically, futures are returned when the stream needs to release resources.
* For example, a stream might need to close an open file (as an asynchronous
@ -1711,7 +1715,7 @@ abstract class StreamTransformer<S, T> {
* },
* onPause: () { subscription.pause(); },
* onResume: () { subscription.resume(); },
* onCancel: () { subscription.cancel(); },
* onCancel: () => subscription.cancel(),
* sync: true);
* return controller.stream.listen(null);
* });

View file

@ -188,9 +188,10 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
// error or done event pending (waiting for the cancel to be done) discard
// that event.
_state &= ~_STATE_WAIT_FOR_CANCEL;
if (_isCanceled) return _cancelFuture;
if (!_isCanceled) {
_cancel();
return _cancelFuture;
}
return _cancelFuture ?? Future._nullFuture;
}
Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
@ -199,8 +200,14 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
// Overwrite the onDone and onError handlers.
_onDone = () { result._complete(futureValue); };
_onError = (error, stackTrace) {
cancel();
Future cancelFuture = cancel();
if (!identical(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() {
result._completeError(error, stackTrace);
});
} else {
result._completeError(error, stackTrace);
}
};
return result;
@ -361,7 +368,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
if (_cancelOnError) {
_state |= _STATE_WAIT_FOR_CANCEL;
_cancel();
if (_cancelFuture is Future) {
if (_cancelFuture is Future &&
!identical(_cancelFuture, Future._nullFuture)) {
_cancelFuture.whenComplete(sendError);
} else {
sendError();
@ -389,7 +397,8 @@ class _BufferingStreamSubscription<T> implements StreamSubscription<T>,
_cancel();
_state |= _STATE_WAIT_FOR_CANCEL;
if (_cancelFuture is Future) {
if (_cancelFuture is Future &&
!identical(_cancelFuture, Future._nullFuture)) {
_cancelFuture.whenComplete(sendDone);
} else {
sendDone();
@ -778,7 +787,7 @@ class _DoneStreamSubscription<T> implements StreamSubscription<T> {
}
}
Future cancel() => null;
Future cancel() => Future._nullFuture;
Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]) {
_Future/*<E>*/ result = new _Future/*<E>*/();
@ -916,7 +925,7 @@ class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
Future cancel() {
_stream._cancelSubscription();
return null;
return Future._nullFuture;
}
bool get isPaused {
@ -1032,7 +1041,7 @@ class _StreamIteratorImpl<T> implements StreamIterator<T> {
Future cancel() {
StreamSubscription subscription = _subscription;
if (subscription == null) return null;
if (subscription == null) return Future._nullFuture;
if (_state == _STATE_MOVING) {
_Future<bool> hasNext = _futureOrPrefetch as Object /*=_Future<bool>*/;
_clear();

View file

@ -29,7 +29,7 @@ void _cancelAndError(StreamSubscription subscription,
error,
StackTrace stackTrace) {
var cancelFuture = subscription.cancel();
if (cancelFuture is Future) {
if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() => future._completeError(error, stackTrace));
} else {
future._completeError(error, stackTrace);
@ -61,7 +61,7 @@ _ErrorCallback _cancelAndErrorClosure(
before completing with a value. */
void _cancelAndValue(StreamSubscription subscription, _Future future, value) {
var cancelFuture = subscription.cancel();
if (cancelFuture is Future) {
if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() => future._complete(value));
} else {
future._complete(value);

View file

@ -109,7 +109,7 @@ class _SinkTransformerStreamSubscription<S, T>
if (_isSubscribed) {
StreamSubscription subscription = _subscription;
_subscription = null;
subscription.cancel();
return subscription.cancel();
}
return null;
}

View file

@ -16,6 +16,7 @@ fail(e) { Expect.fail("Unexepected error: $e"); }
void testMultiController() {
// Test normal flow.
{
var c = new StreamController(sync: true);
Events expectedEvents = new Events()
..add(42)
@ -23,57 +24,68 @@ void testMultiController() {
..error("error!")
..error("error too!")
..close();
CaptureEvents actualEvents = new Events.capture(c.stream.asBroadcastStream());
CaptureEvents actualEvents =
new Events.capture(c.stream.asBroadcastStream());
expectedEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// Test automatic unsubscription on error.
c = new StreamController(sync: true);
expectedEvents = new Events()..add(42)..error("error");
actualEvents = new Events.capture(c.stream.asBroadcastStream(),
{
var c = new StreamController(sync: true);
var expectedEvents = new Events()..add(42)..error("error");
var actualEvents = new Events.capture(c.stream.asBroadcastStream(),
cancelOnError: true);
Events sentEvents =
new Events()..add(42)..error("error")..add("Are you there?");
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// Test manual unsubscription.
c = new StreamController(sync: true);
expectedEvents = new Events()..add(42)..error("error")..add(37);
actualEvents = new Events.capture(c.stream.asBroadcastStream(),
{
var c = new StreamController(sync: true);
var expectedEvents = new Events()..add(42)..error("error")..add(37);
var actualEvents = new Events.capture(c.stream.asBroadcastStream(),
cancelOnError: false);
expectedEvents.replay(c);
actualEvents.subscription.cancel();
c.add("Are you there"); // Not sent to actualEvents.
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// Test filter.
c = new StreamController(sync: true);
expectedEvents = new Events()
{
var c = new StreamController(sync: true);
var expectedEvents = new Events()
..add("a string")..add("another string")..close();
sentEvents = new Events()
var sentEvents = new Events()
..add("a string")..add(42)..add("another string")..close();
actualEvents = new Events.capture(c.stream
var actualEvents = new Events.capture(c.stream
.asBroadcastStream()
.where((v) => v is String));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// Test map.
c = new StreamController(sync: true);
expectedEvents = new Events()..add("abab")..error("error")..close();
sentEvents = new Events()..add("ab")..error("error")..close();
actualEvents = new Events.capture(c.stream
{
var c = new StreamController(sync: true);
var expectedEvents = new Events()..add("abab")..error("error")..close();
var sentEvents = new Events()..add("ab")..error("error")..close();
var actualEvents = new Events.capture(c.stream
.asBroadcastStream()
.map((v) => "$v$v"));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// Test handleError.
c = new StreamController(sync: true);
expectedEvents = new Events()..add("ab")..error("[foo]");
sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close();
actualEvents = new Events.capture(c.stream
{
var c = new StreamController(sync: true);
var expectedEvents = new Events()..add("ab")..error("[foo]");
var sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close();
var actualEvents = new Events.capture(c.stream
.asBroadcastStream()
.handleError((error) {
if (error is String) {
@ -83,30 +95,36 @@ void testMultiController() {
}), cancelOnError: true);
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// reduce is tested asynchronously and therefore not in this file.
// Test expand
c = new StreamController(sync: true);
sentEvents = new Events()..add(3)..add(2)..add(4)..close();
expectedEvents = new Events()..add(1)..add(2)..add(3)
{
var c = new StreamController(sync: true);
var sentEvents = new Events()..add(3)..add(2)..add(4)..close();
var expectedEvents = new Events()..add(1)..add(2)..add(3)
..add(1)..add(2)
..add(1)..add(2)..add(3)..add(4)
..close();
actualEvents = new Events.capture(c.stream.asBroadcastStream().expand((v) {
var actualEvents =
new Events.capture(c.stream.asBroadcastStream().expand((v) {
var l = [];
for (int i = 0; i < v; i++) l.add(i + 1);
return l;
}));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// Test transform.
c = new StreamController(sync: true);
sentEvents = new Events()..add("a")..error(42)..add("b")..close();
expectedEvents =
{
var c = new StreamController(sync: true);
var sentEvents = new Events()..add("a")..error(42)..add("b")..close();
var expectedEvents =
new Events()..error("a")..add(42)..error("b")..add("foo")..close();
actualEvents = new Events.capture(c.stream.asBroadcastStream().transform(
var actualEvents =
new Events.capture(c.stream.asBroadcastStream().transform(
new StreamTransformer.fromHandlers(
handleData: (v, s) { s.addError(v); },
handleError: (e, st, s) { s.add(e); },
@ -116,17 +134,19 @@ void testMultiController() {
})));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// Test multiple filters.
c = new StreamController(sync: true);
sentEvents = new Events()..add(42)
{
var c = new StreamController(sync: true);
var sentEvents = new Events()..add(42)
..add("snugglefluffy")
..add(7)
..add("42")
..error("not FormatException") // Unsubscribes.
..close();
expectedEvents = new Events()..add(42)..error("not FormatException");
actualEvents = new Events.capture(
var expectedEvents = new Events()..add(42)..error("not FormatException");
var actualEvents = new Events.capture(
c.stream.asBroadcastStream().where((v) => v is String)
.map((v) => int.parse(v))
.handleError((error) {
@ -136,9 +156,11 @@ void testMultiController() {
cancelOnError: true);
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// Test subscription changes while firing.
c = new StreamController(sync: true);
{
var c = new StreamController(sync: true);
var sink = c.sink;
var stream = c.stream.asBroadcastStream();
var counter = 0;
@ -161,10 +183,12 @@ void testMultiController() {
sink.add(4); // -"-
sink.add(5); // seen by stream 10
Expect.equals(1 + 20 + 200 + 30 + 300 + 40 + 400 + 50, counter);
}
}
testSingleController() {
// Test normal flow.
{
var c = new StreamController(sync: true);
Events expectedEvents = new Events()
..add(42)
@ -175,48 +199,58 @@ testSingleController() {
CaptureEvents actualEvents = new Events.capture(c.stream);
expectedEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// Test automatic unsubscription on error.
c = new StreamController(sync: true);
expectedEvents = new Events()..add(42)..error("error");
actualEvents = new Events.capture(c.stream, cancelOnError: true);
{
var c = new StreamController(sync: true);
var expectedEvents = new Events()..add(42)..error("error");
var actualEvents = new Events.capture(c.stream, cancelOnError: true);
Events sentEvents =
new Events()..add(42)..error("error")..add("Are you there?");
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// Test manual unsubscription.
c = new StreamController(sync: true);
expectedEvents = new Events()..add(42)..error("error")..add(37);
actualEvents = new Events.capture(c.stream, cancelOnError: false);
{
var c = new StreamController(sync: true);
var expectedEvents = new Events()..add(42)..error("error")..add(37);
var actualEvents = new Events.capture(c.stream, cancelOnError: false);
expectedEvents.replay(c);
actualEvents.subscription.cancel();
c.add("Are you there"); // Not sent to actualEvents.
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// Test filter.
c = new StreamController(sync: true);
expectedEvents = new Events()
{
var c = new StreamController(sync: true);
var expectedEvents = new Events()
..add("a string")..add("another string")..close();
sentEvents = new Events()
var sentEvents = new Events()
..add("a string")..add(42)..add("another string")..close();
actualEvents = new Events.capture(c.stream.where((v) => v is String));
var actualEvents = new Events.capture(c.stream.where((v) => v is String));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// Test map.
c = new StreamController(sync: true);
expectedEvents = new Events()..add("abab")..error("error")..close();
sentEvents = new Events()..add("ab")..error("error")..close();
actualEvents = new Events.capture(c.stream.map((v) => "$v$v"));
{
var c = new StreamController(sync: true);
var expectedEvents = new Events()..add("abab")..error("error")..close();
var sentEvents = new Events()..add("ab")..error("error")..close();
var actualEvents = new Events.capture(c.stream.map((v) => "$v$v"));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// Test handleError.
c = new StreamController(sync: true);
expectedEvents = new Events()..add("ab")..error("[foo]");
sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close();
actualEvents = new Events.capture(c.stream.handleError((error) {
{
var c = new StreamController(sync: true);
var expectedEvents = new Events()..add("ab")..error("[foo]");
var sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close();
var actualEvents = new Events.capture(c.stream.handleError((error) {
if (error is String) {
// TODO(floitsch): this error originally changed the stack trace.
throw "[${error}]";
@ -224,29 +258,32 @@ testSingleController() {
}), cancelOnError: true);
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// reduce is tested asynchronously and therefore not in this file.
// Test expand
c = new StreamController(sync: true);
sentEvents = new Events()..add(3)..add(2)..add(4)..close();
expectedEvents = new Events()..add(1)..add(2)..add(3)
{
var c = new StreamController(sync: true);
var sentEvents = new Events()..add(3)..add(2)..add(4)..close();
var expectedEvents = new Events()..add(1)..add(2)..add(3)
..add(1)..add(2)
..add(1)..add(2)..add(3)..add(4)
..close();
actualEvents = new Events.capture(c.stream.expand((v) {
var actualEvents = new Events.capture(c.stream.expand((v) {
var l = [];
for (int i = 0; i < v; i++) l.add(i + 1);
return l;
}));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// test contains.
{
c = new StreamController(sync: true);
var c = new StreamController(sync: true);
// Error after match is not important.
sentEvents = new Events()..add("a")..add("x")..error("FAIL")..close();
var sentEvents = new Events()..add("a")..add("x")..error("FAIL")..close();
Future<bool> contains = c.stream.contains("x");
contains.then((var c) {
Expect.isTrue(c);
@ -255,9 +292,9 @@ testSingleController() {
}
{
c = new StreamController(sync: true);
var c = new StreamController(sync: true);
// Not matching is ok.
sentEvents = new Events()..add("a")..add("x")..add("b")..close();
var sentEvents = new Events()..add("a")..add("x")..add("b")..close();
Future<bool> contains = c.stream.contains("y");
contains.then((var c) {
Expect.isFalse(c);
@ -266,9 +303,9 @@ testSingleController() {
}
{
c = new StreamController(sync: true);
var c = new StreamController(sync: true);
// Error before match makes future err.
sentEvents = new Events()..add("a")..error("FAIL")..add("b")..close();
var sentEvents = new Events()..add("a")..error("FAIL")..add("b")..close();
Future<bool> contains = c.stream.contains("b");
contains.then((var c) {
Expect.fail("no value expected");
@ -279,11 +316,12 @@ testSingleController() {
}
// Test transform.
c = new StreamController(sync: true);
sentEvents = new Events()..add("a")..error(42)..add("b")..close();
expectedEvents =
{
var c = new StreamController(sync: true);
var sentEvents = new Events()..add("a")..error(42)..add("b")..close();
var expectedEvents =
new Events()..error("a")..add(42)..error("b")..add("foo")..close();
actualEvents = new Events.capture(c.stream.transform(
var actualEvents = new Events.capture(c.stream.transform(
new StreamTransformer.fromHandlers(
handleData: (v, s) { s.addError(v); },
handleError: (e, st, s) { s.add(e); },
@ -293,17 +331,19 @@ testSingleController() {
})));
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// Test multiple filters.
c = new StreamController(sync: true);
sentEvents = new Events()..add(42)
{
var c = new StreamController(sync: true);
var sentEvents = new Events()..add(42)
..add("snugglefluffy")
..add(7)
..add("42")
..error("not FormatException") // Unsubscribes.
..close();
expectedEvents = new Events()..add(42)..error("not FormatException");
actualEvents = new Events.capture(
var expectedEvents = new Events()..add(42)..error("not FormatException");
var actualEvents = new Events.capture(
c.stream.where((v) => v is String)
.map((v) => int.parse(v))
.handleError((error) {
@ -313,9 +353,11 @@ testSingleController() {
cancelOnError: true);
sentEvents.replay(c);
Expect.listEquals(expectedEvents.events, actualEvents.events);
}
// Test that only one subscription is allowed.
c = new StreamController(sync: true);
{
var c = new StreamController(sync: true);
var sink = c.sink;
var stream = c.stream;
var counter = 0;
@ -324,6 +366,7 @@ testSingleController() {
sink.add(1);
Expect.equals(1, counter);
c.close();
}
}
testExtraMethods() {

View file

@ -16,7 +16,10 @@ main() {
subscription = stream.listen(expectAsync((data) {
expect(data, isNull);
receivedCount++;
if (receivedCount == 5) subscription.cancel();
if (receivedCount == 5) {
var future = subscription.cancel();
expect(future, completes);
}
}, count: 5));
});
}

View file

@ -10,7 +10,7 @@ import 'dart:async';
import 'package:unittest/unittest.dart';
main() {
test("subscription.asStream success", () {
test("subscription.asFuture success", () {
Stream stream = new Stream.fromIterable([1, 2, 3]);
var output = [];
var subscription = stream.listen((x) { output.add(x); });
@ -19,7 +19,7 @@ main() {
}));
});
test("subscription.asStream success2", () {
test("subscription.asFuture success2", () {
StreamController controller = new StreamController(sync: true);
[1, 2, 3].forEach(controller.add);
controller.close();
@ -31,7 +31,7 @@ main() {
}));
});
test("subscription.asStream success 3", () {
test("subscription.asFuture success 3", () {
Stream stream = new Stream.fromIterable([1, 2, 3]).map((x) => x);
var output = [];
var subscription = stream.listen((x) { output.add(x); });
@ -40,7 +40,7 @@ main() {
}));
});
test("subscription.asStream different type", () {
test("subscription.asFuture different type", () {
Stream stream = new Stream<int>.fromIterable([1, 2, 3]);
var asyncCallback = expectAsync(() => {});
var output = [];
@ -52,7 +52,7 @@ main() {
});
});
test("subscription.asStream failure", () {
test("subscription.asFuture failure", () {
StreamController controller = new StreamController(sync: true);
[1, 2, 3].forEach(controller.add);
controller.addError("foo");
@ -65,7 +65,7 @@ main() {
}));
});
test("subscription.asStream failure2", () {
test("subscription.asFuture failure2", () {
Stream stream = new Stream.fromIterable([1, 2, 3, 4])
.map((x) {
if (x == 4) throw "foo";
@ -77,4 +77,50 @@ main() {
Expect.equals(error, "foo");
}));
});
test("subscription.asFuture delayed cancel", () {
var completer = new Completer();
var controller =
new StreamController(onCancel: () => completer.future, sync: true);
[1, 2, 3].forEach(controller.add);
controller.addError("foo");
controller.close();
Stream stream = controller.stream;
var output = [];
var subscription = stream.listen((x) { output.add(x); });
bool catchErrorHasRun = false;
subscription.asFuture(output).catchError(expectAsync((error) {
Expect.equals(error, "foo");
catchErrorHasRun = true;
}));
Timer.run(expectAsync(() {
Expect.isFalse(catchErrorHasRun);
completer.complete();
}));
});
test("subscription.asFuture failure in cancel", () {
runZoned(() {
var completer = new Completer();
var controller =
new StreamController(onCancel: () => completer.future, sync: true);
[1, 2, 3].forEach(controller.add);
controller.addError("foo");
controller.close();
Stream stream = controller.stream;
var output = [];
var subscription = stream.listen((x) { output.add(x); });
bool catchErrorHasRun = false;
subscription.asFuture(output).catchError(expectAsync((error) {
Expect.equals(error, "foo");
catchErrorHasRun = true;
}));
Timer.run(expectAsync(() {
Expect.isFalse(catchErrorHasRun);
completer.completeError(499);
}));
}, onError: expectAsync((e) {
Expect.equals(499, e);
}));
});
}

View file

@ -27,17 +27,22 @@ void main() {
test('subscription.cancel after close', () {
var completer = new Completer();
StreamController controller = new StreamController(
onCancel: completer.complete);
onCancel: () {
completer.complete();
return completer.future;
});
controller.close();
var completer2 = new Completer();
var sub;
void onDone() {
expect(sub.cancel(), isNull);
sub.cancel().then(completer2.complete);
}
sub = controller.stream.listen(null, onDone: onDone);
expect(completer.future, completes);
expect(completer2.future, completes);
});
test('subscription.cancel after error', () {
@ -135,4 +140,183 @@ void main() {
.cancel();
expect(doneCompleter.future, completion(equals(true)));
});
test('subscription.cancel through map', () {
var completer = new Completer();
StreamController controller = new StreamController(
onCancel: () => completer.future);
bool done = false;
var future = controller.stream.map((x) => x).listen(null).cancel();
expect(future.then((_) => done = true), completion(equals(true)));
Timer.run(() {
expect(done, isFalse);
completer.complete();
});
});
test('subscription.cancel through asyncMap', () {
var completer = new Completer();
StreamController controller = new StreamController(
onCancel: () => completer.future);
bool done = false;
var future = controller.stream.asyncMap((x) => x).listen(null).cancel();
expect(future.then((_) => done = true), completion(equals(true)));
Timer.run(() {
expect(done, isFalse);
completer.complete();
});
});
test('subscription.cancel through asyncExpand', () {
var completer = new Completer();
StreamController controller = new StreamController(
onCancel: () => completer.future);
bool done = false;
var future = controller.stream.asyncExpand((x) => x).listen(null).cancel();
expect(future.then((_) => done = true), completion(equals(true)));
Timer.run(() {
expect(done, isFalse);
completer.complete();
});
});
test('subscription.cancel through handleError', () {
var completer = new Completer();
StreamController controller = new StreamController(
onCancel: () => completer.future);
bool done = false;
var future = controller.stream.handleError((x) => x).listen(null).cancel();
expect(future.then((_) => done = true), completion(equals(true)));
Timer.run(() {
expect(done, isFalse);
completer.complete();
});
});
test('subscription.cancel through skip', () {
var completer = new Completer();
StreamController controller = new StreamController(
onCancel: () => completer.future);
bool done = false;
var future = controller.stream.skip(1).listen(null).cancel();
expect(future.then((_) => done = true), completion(equals(true)));
Timer.run(() {
expect(done, isFalse);
completer.complete();
});
});
test('subscription.cancel through take', () {
var completer = new Completer();
StreamController controller = new StreamController(
onCancel: () => completer.future);
bool done = false;
var future = controller.stream.take(1).listen(null).cancel();
expect(future.then((_) => done = true), completion(equals(true)));
Timer.run(() {
expect(done, isFalse);
completer.complete();
});
});
test('subscription.cancel through skipWhile', () {
var completer = new Completer();
StreamController controller = new StreamController(
onCancel: () => completer.future);
bool done = false;
var future = controller.stream.skipWhile((x) => true).listen(null).cancel();
expect(future.then((_) => done = true), completion(equals(true)));
Timer.run(() {
expect(done, isFalse);
completer.complete();
});
});
test('subscription.cancel through takeWhile', () {
var completer = new Completer();
StreamController controller = new StreamController(
onCancel: () => completer.future);
bool done = false;
var future = controller.stream.takeWhile((x) => true).listen(null).cancel();
expect(future.then((_) => done = true), completion(equals(true)));
Timer.run(() {
expect(done, isFalse);
completer.complete();
});
});
test('subscription.cancel through timeOut', () {
var completer = new Completer();
StreamController controller = new StreamController(
onCancel: () => completer.future);
bool done = false;
var duration = const Duration(hours: 5);
var future = controller.stream.timeout(duration).listen(null).cancel();
expect(future.then((_) => done = true), completion(equals(true)));
Timer.run(() {
expect(done, isFalse);
completer.complete();
});
});
test('subscription.cancel through transform', () {
var completer = new Completer();
StreamController controller = new StreamController(
onCancel: () => completer.future);
bool done = false;
var transformer =
new StreamTransformer.fromHandlers(handleData: (x, y) {});
var future = controller.stream.transform(transformer).listen(null).cancel();
expect(future.then((_) => done = true), completion(equals(true)));
Timer.run(() {
expect(done, isFalse);
completer.complete();
});
});
test('subscription.cancel through where', () {
var completer = new Completer();
StreamController controller = new StreamController(
onCancel: () => completer.future);
bool done = false;
var future = controller.stream.where((x) => true).listen(null).cancel();
expect(future.then((_) => done = true), completion(equals(true)));
Timer.run(() {
expect(done, isFalse);
completer.complete();
});
});
}

View file

@ -187,6 +187,7 @@ async/stream_asyncexpand_test: RuntimeError # Timer interface not supported: Iss
async/stream_asyncmap_test: RuntimeError # Timer interface not supported: Issue 7728.
async/stream_transformation_broadcast_test: RuntimeError # Timer interface not supported: Issue 7728.
async/stream_controller_test: Fail # Timer interface not supported: Issue 7728.
async/stream_subscription_cancel_test: Fail # Timer interface not supported: Issue 7728.
async/future_constructor2_test: Fail # Timer interface not supported: Issue 7728.
mirrors/mirrors_reader_test: Skip # Running in v8 suffices. Issue 16589 - RuntimeError. Issue 22130 - Crash (out of memory).