diff --git a/CHANGELOG.md b/CHANGELOG.md index 75439f763ec..35da026d4e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,12 +16,6 @@ * Report a better error when a bind fails because of a bad source address. * Handle HTTP header `charset` parameter with empty value. -* `dart:async` - * More aggressively returns a Future on Stream.cancel operations. - Discourages to return `null` from `cancel`. - * Fixes a few bugs where the cancel future wasn't passed through - transformations. - ### Strong Mode * New feature - an option to disable implicit casts diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart index f4abf9aac9c..7661d9ffe99 100644 --- a/sdk/lib/async/stream.dart +++ b/sdk/lib/async/stream.dart @@ -232,7 +232,6 @@ abstract class Stream { onCancel: () { if (timer != null) timer.cancel(); timer = null; - return Future._nullFuture; }); return controller.stream; } @@ -442,7 +441,7 @@ abstract class Stream { onListen: onListen, onPause: () { subscription.pause(); }, onResume: () { subscription.resume(); }, - onCancel: () => subscription.cancel(), + onCancel: () { subscription.cancel(); }, sync: true ); } @@ -500,7 +499,7 @@ abstract class Stream { onListen: onListen, onPause: () { subscription.pause(); }, onResume: () { subscription.resume(); }, - onCancel: () => subscription.cancel(), + onCancel: () { subscription.cancel(); }, sync: true ); } @@ -1408,10 +1407,7 @@ abstract class StreamSubscription { * the subscription is canceled. * * Returns a future that is completed once the stream has finished - * its cleanup. - * - * For historical reasons, may also return `null` if no cleanup was necessary. - * Returning `null` is deprecated and should be avoided. + * its cleanup. May also return `null` if no cleanup was necessary. * * Typically, futures are returned when the stream needs to release resources. * For example, a stream might need to close an open file (as an asynchronous @@ -1715,7 +1711,7 @@ abstract class StreamTransformer { * }, * onPause: () { subscription.pause(); }, * onResume: () { subscription.resume(); }, - * onCancel: () => subscription.cancel(), + * onCancel: () { subscription.cancel(); }, * sync: true); * return controller.stream.listen(null); * }); diff --git a/sdk/lib/async/stream_impl.dart b/sdk/lib/async/stream_impl.dart index cb2e20d0f13..8c9af64bb9e 100644 --- a/sdk/lib/async/stream_impl.dart +++ b/sdk/lib/async/stream_impl.dart @@ -188,10 +188,9 @@ class _BufferingStreamSubscription implements StreamSubscription, // error or done event pending (waiting for the cancel to be done) discard // that event. _state &= ~_STATE_WAIT_FOR_CANCEL; - if (!_isCanceled) { - _cancel(); - } - return _cancelFuture ?? Future._nullFuture; + if (_isCanceled) return _cancelFuture; + _cancel(); + return _cancelFuture; } Future/**/ asFuture/**/([var/*=E*/ futureValue]) { @@ -200,14 +199,8 @@ class _BufferingStreamSubscription implements StreamSubscription, // Overwrite the onDone and onError handlers. _onDone = () { result._complete(futureValue); }; _onError = (error, stackTrace) { - Future cancelFuture = cancel(); - if (!identical(cancelFuture, Future._nullFuture)) { - cancelFuture.whenComplete(() { - result._completeError(error, stackTrace); - }); - } else { - result._completeError(error, stackTrace); - } + cancel(); + result._completeError(error, stackTrace); }; return result; @@ -368,8 +361,7 @@ class _BufferingStreamSubscription implements StreamSubscription, if (_cancelOnError) { _state |= _STATE_WAIT_FOR_CANCEL; _cancel(); - if (_cancelFuture is Future && - !identical(_cancelFuture, Future._nullFuture)) { + if (_cancelFuture is Future) { _cancelFuture.whenComplete(sendError); } else { sendError(); @@ -397,8 +389,7 @@ class _BufferingStreamSubscription implements StreamSubscription, _cancel(); _state |= _STATE_WAIT_FOR_CANCEL; - if (_cancelFuture is Future && - !identical(_cancelFuture, Future._nullFuture)) { + if (_cancelFuture is Future) { _cancelFuture.whenComplete(sendDone); } else { sendDone(); @@ -787,7 +778,7 @@ class _DoneStreamSubscription implements StreamSubscription { } } - Future cancel() => Future._nullFuture; + Future cancel() => null; Future/**/ asFuture/**/([var/*=E*/ futureValue]) { _Future/**/ result = new _Future/**/(); @@ -925,7 +916,7 @@ class _BroadcastSubscriptionWrapper implements StreamSubscription { Future cancel() { _stream._cancelSubscription(); - return Future._nullFuture; + return null; } bool get isPaused { @@ -1041,7 +1032,7 @@ class _StreamIteratorImpl implements StreamIterator { Future cancel() { StreamSubscription subscription = _subscription; - if (subscription == null) return Future._nullFuture; + if (subscription == null) return null; if (_state == _STATE_MOVING) { _Future hasNext = _futureOrPrefetch as Object /*=_Future*/; _clear(); diff --git a/sdk/lib/async/stream_transformers.dart b/sdk/lib/async/stream_transformers.dart index 7542c6b88cd..e0982da4260 100644 --- a/sdk/lib/async/stream_transformers.dart +++ b/sdk/lib/async/stream_transformers.dart @@ -109,7 +109,7 @@ class _SinkTransformerStreamSubscription if (_isSubscribed) { StreamSubscription subscription = _subscription; _subscription = null; - return subscription.cancel(); + subscription.cancel(); } return null; } diff --git a/tests/lib/async/stream_controller_test.dart b/tests/lib/async/stream_controller_test.dart index 5b019320843..afc7bb53cd9 100644 --- a/tests/lib/async/stream_controller_test.dart +++ b/tests/lib/async/stream_controller_test.dart @@ -16,274 +16,237 @@ fail(e) { Expect.fail("Unexepected error: $e"); } void testMultiController() { // Test normal flow. - { - var c = new StreamController(sync: true); - Events expectedEvents = new Events() - ..add(42) - ..add("dibs") - ..error("error!") - ..error("error too!") - ..close(); - CaptureEvents actualEvents = - new Events.capture(c.stream.asBroadcastStream()); - expectedEvents.replay(c); - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + var c = new StreamController(sync: true); + Events expectedEvents = new Events() + ..add(42) + ..add("dibs") + ..error("error!") + ..error("error too!") + ..close(); + CaptureEvents actualEvents = new Events.capture(c.stream.asBroadcastStream()); + expectedEvents.replay(c); + Expect.listEquals(expectedEvents.events, actualEvents.events); // Test automatic unsubscription on error. - { - var c = new StreamController(sync: true); - var expectedEvents = new Events()..add(42)..error("error"); - var actualEvents = new Events.capture(c.stream.asBroadcastStream(), - cancelOnError: true); - Events sentEvents = - new Events()..add(42)..error("error")..add("Are you there?"); - sentEvents.replay(c); - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + c = new StreamController(sync: true); + expectedEvents = new Events()..add(42)..error("error"); + actualEvents = new Events.capture(c.stream.asBroadcastStream(), + cancelOnError: true); + Events sentEvents = + new Events()..add(42)..error("error")..add("Are you there?"); + sentEvents.replay(c); + Expect.listEquals(expectedEvents.events, actualEvents.events); // Test manual unsubscription. - { - var c = new StreamController(sync: true); - var expectedEvents = new Events()..add(42)..error("error")..add(37); - var actualEvents = new Events.capture(c.stream.asBroadcastStream(), - cancelOnError: false); - expectedEvents.replay(c); - actualEvents.subscription.cancel(); - c.add("Are you there"); // Not sent to actualEvents. - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + c = new StreamController(sync: true); + expectedEvents = new Events()..add(42)..error("error")..add(37); + actualEvents = new Events.capture(c.stream.asBroadcastStream(), + cancelOnError: false); + expectedEvents.replay(c); + actualEvents.subscription.cancel(); + c.add("Are you there"); // Not sent to actualEvents. + Expect.listEquals(expectedEvents.events, actualEvents.events); // Test filter. - { - var c = new StreamController(sync: true); - var expectedEvents = new Events() - ..add("a string")..add("another string")..close(); - var sentEvents = new Events() - ..add("a string")..add(42)..add("another string")..close(); - var actualEvents = new Events.capture(c.stream - .asBroadcastStream() - .where((v) => v is String)); - sentEvents.replay(c); - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + c = new StreamController(sync: true); + expectedEvents = new Events() + ..add("a string")..add("another string")..close(); + sentEvents = new Events() + ..add("a string")..add(42)..add("another string")..close(); + actualEvents = new Events.capture(c.stream + .asBroadcastStream() + .where((v) => v is String)); + sentEvents.replay(c); + Expect.listEquals(expectedEvents.events, actualEvents.events); // Test map. - { - var c = new StreamController(sync: true); - var expectedEvents = new Events()..add("abab")..error("error")..close(); - var sentEvents = new Events()..add("ab")..error("error")..close(); - var actualEvents = new Events.capture(c.stream - .asBroadcastStream() - .map((v) => "$v$v")); - sentEvents.replay(c); - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + c = new StreamController(sync: true); + expectedEvents = new Events()..add("abab")..error("error")..close(); + sentEvents = new Events()..add("ab")..error("error")..close(); + actualEvents = new Events.capture(c.stream + .asBroadcastStream() + .map((v) => "$v$v")); + sentEvents.replay(c); + Expect.listEquals(expectedEvents.events, actualEvents.events); // Test handleError. - { - var c = new StreamController(sync: true); - var expectedEvents = new Events()..add("ab")..error("[foo]"); - var sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close(); - var actualEvents = new Events.capture(c.stream - .asBroadcastStream() - .handleError((error) { - if (error is String) { - // TODO(floitsch): this test originally changed the stacktrace. - throw "[${error}]"; - } - }), cancelOnError: true); - sentEvents.replay(c); - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + c = new StreamController(sync: true); + expectedEvents = new Events()..add("ab")..error("[foo]"); + sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close(); + actualEvents = new Events.capture(c.stream + .asBroadcastStream() + .handleError((error) { + if (error is String) { + // TODO(floitsch): this test originally changed the stacktrace. + throw "[${error}]"; + } + }), cancelOnError: true); + sentEvents.replay(c); + Expect.listEquals(expectedEvents.events, actualEvents.events); // reduce is tested asynchronously and therefore not in this file. // Test expand - { - var c = new StreamController(sync: true); - var sentEvents = new Events()..add(3)..add(2)..add(4)..close(); - var expectedEvents = new Events()..add(1)..add(2)..add(3) - ..add(1)..add(2) - ..add(1)..add(2)..add(3)..add(4) - ..close(); - var actualEvents = - new Events.capture(c.stream.asBroadcastStream().expand((v) { - var l = []; - for (int i = 0; i < v; i++) l.add(i + 1); - return l; - })); - sentEvents.replay(c); - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + c = new StreamController(sync: true); + sentEvents = new Events()..add(3)..add(2)..add(4)..close(); + expectedEvents = new Events()..add(1)..add(2)..add(3) + ..add(1)..add(2) + ..add(1)..add(2)..add(3)..add(4) + ..close(); + actualEvents = new Events.capture(c.stream.asBroadcastStream().expand((v) { + var l = []; + for (int i = 0; i < v; i++) l.add(i + 1); + return l; + })); + sentEvents.replay(c); + Expect.listEquals(expectedEvents.events, actualEvents.events); // Test transform. - { - var c = new StreamController(sync: true); - var sentEvents = new Events()..add("a")..error(42)..add("b")..close(); - var expectedEvents = - new Events()..error("a")..add(42)..error("b")..add("foo")..close(); - var actualEvents = - new Events.capture(c.stream.asBroadcastStream().transform( - new StreamTransformer.fromHandlers( - handleData: (v, s) { s.addError(v); }, - handleError: (e, st, s) { s.add(e); }, - handleDone: (s) { - s.add("foo"); - s.close(); - }))); - sentEvents.replay(c); - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + c = new StreamController(sync: true); + sentEvents = new Events()..add("a")..error(42)..add("b")..close(); + expectedEvents = + new Events()..error("a")..add(42)..error("b")..add("foo")..close(); + actualEvents = new Events.capture(c.stream.asBroadcastStream().transform( + new StreamTransformer.fromHandlers( + handleData: (v, s) { s.addError(v); }, + handleError: (e, st, s) { s.add(e); }, + handleDone: (s) { + s.add("foo"); + s.close(); + }))); + sentEvents.replay(c); + Expect.listEquals(expectedEvents.events, actualEvents.events); // Test multiple filters. - { - var c = new StreamController(sync: true); - var sentEvents = new Events()..add(42) - ..add("snugglefluffy") - ..add(7) - ..add("42") - ..error("not FormatException") // Unsubscribes. - ..close(); - var expectedEvents = new Events()..add(42)..error("not FormatException"); - var actualEvents = new Events.capture( - c.stream.asBroadcastStream().where((v) => v is String) - .map((v) => int.parse(v)) - .handleError((error) { - if (error is! FormatException) throw error; - }) - .where((v) => v > 10), - cancelOnError: true); - sentEvents.replay(c); - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + c = new StreamController(sync: true); + sentEvents = new Events()..add(42) + ..add("snugglefluffy") + ..add(7) + ..add("42") + ..error("not FormatException") // Unsubscribes. + ..close(); + expectedEvents = new Events()..add(42)..error("not FormatException"); + actualEvents = new Events.capture( + c.stream.asBroadcastStream().where((v) => v is String) + .map((v) => int.parse(v)) + .handleError((error) { + if (error is! FormatException) throw error; + }) + .where((v) => v > 10), + cancelOnError: true); + sentEvents.replay(c); + Expect.listEquals(expectedEvents.events, actualEvents.events); // Test subscription changes while firing. - { - var c = new StreamController(sync: true); - var sink = c.sink; - var stream = c.stream.asBroadcastStream(); - var counter = 0; - var subscription = stream.listen(null); - subscription.onData((data) { - counter += data; - subscription.cancel(); - stream.listen((data) { - counter += 10 * data; - }); - var subscription2 = stream.listen(null); - subscription2.onData((data) { - counter += 100 * data; - if (data == 4) subscription2.cancel(); - }); + c = new StreamController(sync: true); + var sink = c.sink; + var stream = c.stream.asBroadcastStream(); + var counter = 0; + var subscription = stream.listen(null); + subscription.onData((data) { + counter += data; + subscription.cancel(); + stream.listen((data) { + counter += 10 * data; }); - sink.add(1); // seen by stream 1 - sink.add(2); // seen by stream 10 and 100 - sink.add(3); // -"- - sink.add(4); // -"- - sink.add(5); // seen by stream 10 - Expect.equals(1 + 20 + 200 + 30 + 300 + 40 + 400 + 50, counter); - } + var subscription2 = stream.listen(null); + subscription2.onData((data) { + counter += 100 * data; + if (data == 4) subscription2.cancel(); + }); + }); + sink.add(1); // seen by stream 1 + sink.add(2); // seen by stream 10 and 100 + sink.add(3); // -"- + sink.add(4); // -"- + sink.add(5); // seen by stream 10 + Expect.equals(1 + 20 + 200 + 30 + 300 + 40 + 400 + 50, counter); } testSingleController() { // Test normal flow. - { - var c = new StreamController(sync: true); - Events expectedEvents = new Events() - ..add(42) - ..add("dibs") - ..error("error!") - ..error("error too!") - ..close(); - CaptureEvents actualEvents = new Events.capture(c.stream); - expectedEvents.replay(c); - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + var c = new StreamController(sync: true); + Events expectedEvents = new Events() + ..add(42) + ..add("dibs") + ..error("error!") + ..error("error too!") + ..close(); + CaptureEvents actualEvents = new Events.capture(c.stream); + expectedEvents.replay(c); + Expect.listEquals(expectedEvents.events, actualEvents.events); // Test automatic unsubscription on error. - { - var c = new StreamController(sync: true); - var expectedEvents = new Events()..add(42)..error("error"); - var actualEvents = new Events.capture(c.stream, cancelOnError: true); - Events sentEvents = - new Events()..add(42)..error("error")..add("Are you there?"); - sentEvents.replay(c); - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + c = new StreamController(sync: true); + expectedEvents = new Events()..add(42)..error("error"); + actualEvents = new Events.capture(c.stream, cancelOnError: true); + Events sentEvents = + new Events()..add(42)..error("error")..add("Are you there?"); + sentEvents.replay(c); + Expect.listEquals(expectedEvents.events, actualEvents.events); // Test manual unsubscription. - { - var c = new StreamController(sync: true); - var expectedEvents = new Events()..add(42)..error("error")..add(37); - var actualEvents = new Events.capture(c.stream, cancelOnError: false); - expectedEvents.replay(c); - actualEvents.subscription.cancel(); - c.add("Are you there"); // Not sent to actualEvents. - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + c = new StreamController(sync: true); + expectedEvents = new Events()..add(42)..error("error")..add(37); + actualEvents = new Events.capture(c.stream, cancelOnError: false); + expectedEvents.replay(c); + actualEvents.subscription.cancel(); + c.add("Are you there"); // Not sent to actualEvents. + Expect.listEquals(expectedEvents.events, actualEvents.events); // Test filter. - { - var c = new StreamController(sync: true); - var expectedEvents = new Events() - ..add("a string")..add("another string")..close(); - var sentEvents = new Events() - ..add("a string")..add(42)..add("another string")..close(); - var actualEvents = new Events.capture(c.stream.where((v) => v is String)); - sentEvents.replay(c); - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + c = new StreamController(sync: true); + expectedEvents = new Events() + ..add("a string")..add("another string")..close(); + sentEvents = new Events() + ..add("a string")..add(42)..add("another string")..close(); + actualEvents = new Events.capture(c.stream.where((v) => v is String)); + sentEvents.replay(c); + Expect.listEquals(expectedEvents.events, actualEvents.events); // Test map. - { - var c = new StreamController(sync: true); - var expectedEvents = new Events()..add("abab")..error("error")..close(); - var sentEvents = new Events()..add("ab")..error("error")..close(); - var actualEvents = new Events.capture(c.stream.map((v) => "$v$v")); - sentEvents.replay(c); - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + c = new StreamController(sync: true); + expectedEvents = new Events()..add("abab")..error("error")..close(); + sentEvents = new Events()..add("ab")..error("error")..close(); + actualEvents = new Events.capture(c.stream.map((v) => "$v$v")); + sentEvents.replay(c); + Expect.listEquals(expectedEvents.events, actualEvents.events); // Test handleError. - { - var c = new StreamController(sync: true); - var expectedEvents = new Events()..add("ab")..error("[foo]"); - var sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close(); - var actualEvents = new Events.capture(c.stream.handleError((error) { - if (error is String) { - // TODO(floitsch): this error originally changed the stack trace. - throw "[${error}]"; - } - }), cancelOnError: true); - sentEvents.replay(c); - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + c = new StreamController(sync: true); + expectedEvents = new Events()..add("ab")..error("[foo]"); + sentEvents = new Events()..add("ab")..error("foo")..add("ab")..close(); + actualEvents = new Events.capture(c.stream.handleError((error) { + if (error is String) { + // TODO(floitsch): this error originally changed the stack trace. + throw "[${error}]"; + } + }), cancelOnError: true); + sentEvents.replay(c); + Expect.listEquals(expectedEvents.events, actualEvents.events); // reduce is tested asynchronously and therefore not in this file. // Test expand - { - var c = new StreamController(sync: true); - var sentEvents = new Events()..add(3)..add(2)..add(4)..close(); - var expectedEvents = new Events()..add(1)..add(2)..add(3) - ..add(1)..add(2) - ..add(1)..add(2)..add(3)..add(4) - ..close(); - var actualEvents = new Events.capture(c.stream.expand((v) { - var l = []; - for (int i = 0; i < v; i++) l.add(i + 1); - return l; - })); - sentEvents.replay(c); - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + c = new StreamController(sync: true); + sentEvents = new Events()..add(3)..add(2)..add(4)..close(); + expectedEvents = new Events()..add(1)..add(2)..add(3) + ..add(1)..add(2) + ..add(1)..add(2)..add(3)..add(4) + ..close(); + actualEvents = new Events.capture(c.stream.expand((v) { + var l = []; + for (int i = 0; i < v; i++) l.add(i + 1); + return l; + })); + sentEvents.replay(c); + Expect.listEquals(expectedEvents.events, actualEvents.events); // test contains. { - var c = new StreamController(sync: true); + c = new StreamController(sync: true); // Error after match is not important. - var sentEvents = new Events()..add("a")..add("x")..error("FAIL")..close(); + sentEvents = new Events()..add("a")..add("x")..error("FAIL")..close(); Future contains = c.stream.contains("x"); contains.then((var c) { Expect.isTrue(c); @@ -292,9 +255,9 @@ testSingleController() { } { - var c = new StreamController(sync: true); + c = new StreamController(sync: true); // Not matching is ok. - var sentEvents = new Events()..add("a")..add("x")..add("b")..close(); + sentEvents = new Events()..add("a")..add("x")..add("b")..close(); Future contains = c.stream.contains("y"); contains.then((var c) { Expect.isFalse(c); @@ -303,9 +266,9 @@ testSingleController() { } { - var c = new StreamController(sync: true); + c = new StreamController(sync: true); // Error before match makes future err. - var sentEvents = new Events()..add("a")..error("FAIL")..add("b")..close(); + sentEvents = new Events()..add("a")..error("FAIL")..add("b")..close(); Future contains = c.stream.contains("b"); contains.then((var c) { Expect.fail("no value expected"); @@ -316,57 +279,51 @@ testSingleController() { } // Test transform. - { - var c = new StreamController(sync: true); - var sentEvents = new Events()..add("a")..error(42)..add("b")..close(); - var expectedEvents = - new Events()..error("a")..add(42)..error("b")..add("foo")..close(); - var actualEvents = new Events.capture(c.stream.transform( - new StreamTransformer.fromHandlers( - handleData: (v, s) { s.addError(v); }, - handleError: (e, st, s) { s.add(e); }, - handleDone: (s) { - s.add("foo"); - s.close(); - }))); - sentEvents.replay(c); - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + c = new StreamController(sync: true); + sentEvents = new Events()..add("a")..error(42)..add("b")..close(); + expectedEvents = + new Events()..error("a")..add(42)..error("b")..add("foo")..close(); + actualEvents = new Events.capture(c.stream.transform( + new StreamTransformer.fromHandlers( + handleData: (v, s) { s.addError(v); }, + handleError: (e, st, s) { s.add(e); }, + handleDone: (s) { + s.add("foo"); + s.close(); + }))); + sentEvents.replay(c); + Expect.listEquals(expectedEvents.events, actualEvents.events); // Test multiple filters. - { - var c = new StreamController(sync: true); - var sentEvents = new Events()..add(42) - ..add("snugglefluffy") - ..add(7) - ..add("42") - ..error("not FormatException") // Unsubscribes. - ..close(); - var expectedEvents = new Events()..add(42)..error("not FormatException"); - var actualEvents = new Events.capture( - c.stream.where((v) => v is String) - .map((v) => int.parse(v)) - .handleError((error) { - if (error is! FormatException) throw error; - }) - .where((v) => v > 10), - cancelOnError: true); - sentEvents.replay(c); - Expect.listEquals(expectedEvents.events, actualEvents.events); - } + c = new StreamController(sync: true); + sentEvents = new Events()..add(42) + ..add("snugglefluffy") + ..add(7) + ..add("42") + ..error("not FormatException") // Unsubscribes. + ..close(); + expectedEvents = new Events()..add(42)..error("not FormatException"); + actualEvents = new Events.capture( + c.stream.where((v) => v is String) + .map((v) => int.parse(v)) + .handleError((error) { + if (error is! FormatException) throw error; + }) + .where((v) => v > 10), + cancelOnError: true); + sentEvents.replay(c); + Expect.listEquals(expectedEvents.events, actualEvents.events); // Test that only one subscription is allowed. - { - var c = new StreamController(sync: true); - var sink = c.sink; - var stream = c.stream; - var counter = 0; - var subscription = stream.listen((data) { counter += data; }); - Expect.throws(() => stream.listen(null), (e) => e is StateError); - sink.add(1); - Expect.equals(1, counter); - c.close(); - } + c = new StreamController(sync: true); + var sink = c.sink; + var stream = c.stream; + var counter = 0; + var subscription = stream.listen((data) { counter += data; }); + Expect.throws(() => stream.listen(null), (e) => e is StateError); + sink.add(1); + Expect.equals(1, counter); + c.close(); } testExtraMethods() { diff --git a/tests/lib/async/stream_periodic_test.dart b/tests/lib/async/stream_periodic_test.dart index f755d76ef41..b333966065b 100644 --- a/tests/lib/async/stream_periodic_test.dart +++ b/tests/lib/async/stream_periodic_test.dart @@ -16,10 +16,7 @@ main() { subscription = stream.listen(expectAsync((data) { expect(data, isNull); receivedCount++; - if (receivedCount == 5) { - var future = subscription.cancel(); - expect(future, completes); - } + if (receivedCount == 5) subscription.cancel(); }, count: 5)); }); } diff --git a/tests/lib/async/stream_subscription_as_future_test.dart b/tests/lib/async/stream_subscription_as_future_test.dart index 841298b1990..ac1293b0297 100644 --- a/tests/lib/async/stream_subscription_as_future_test.dart +++ b/tests/lib/async/stream_subscription_as_future_test.dart @@ -10,7 +10,7 @@ import 'dart:async'; import 'package:unittest/unittest.dart'; main() { - test("subscription.asFuture success", () { + test("subscription.asStream success", () { Stream stream = new Stream.fromIterable([1, 2, 3]); var output = []; var subscription = stream.listen((x) { output.add(x); }); @@ -19,7 +19,7 @@ main() { })); }); - test("subscription.asFuture success2", () { + test("subscription.asStream success2", () { StreamController controller = new StreamController(sync: true); [1, 2, 3].forEach(controller.add); controller.close(); @@ -31,7 +31,7 @@ main() { })); }); - test("subscription.asFuture success 3", () { + test("subscription.asStream success 3", () { Stream stream = new Stream.fromIterable([1, 2, 3]).map((x) => x); var output = []; var subscription = stream.listen((x) { output.add(x); }); @@ -40,7 +40,7 @@ main() { })); }); - test("subscription.asFuture different type", () { + test("subscription.asStream different type", () { Stream stream = new Stream.fromIterable([1, 2, 3]); var asyncCallback = expectAsync(() => {}); var output = []; @@ -52,7 +52,7 @@ main() { }); }); - test("subscription.asFuture failure", () { + test("subscription.asStream failure", () { StreamController controller = new StreamController(sync: true); [1, 2, 3].forEach(controller.add); controller.addError("foo"); @@ -65,7 +65,7 @@ main() { })); }); - test("subscription.asFuture failure2", () { + test("subscription.asStream failure2", () { Stream stream = new Stream.fromIterable([1, 2, 3, 4]) .map((x) { if (x == 4) throw "foo"; @@ -77,50 +77,4 @@ main() { Expect.equals(error, "foo"); })); }); - - test("subscription.asFuture delayed cancel", () { - var completer = new Completer(); - var controller = - new StreamController(onCancel: () => completer.future, sync: true); - [1, 2, 3].forEach(controller.add); - controller.addError("foo"); - controller.close(); - Stream stream = controller.stream; - var output = []; - var subscription = stream.listen((x) { output.add(x); }); - bool catchErrorHasRun = false; - subscription.asFuture(output).catchError(expectAsync((error) { - Expect.equals(error, "foo"); - catchErrorHasRun = true; - })); - Timer.run(expectAsync(() { - Expect.isFalse(catchErrorHasRun); - completer.complete(); - })); - }); - - test("subscription.asFuture failure in cancel", () { - runZoned(() { - var completer = new Completer(); - var controller = - new StreamController(onCancel: () => completer.future, sync: true); - [1, 2, 3].forEach(controller.add); - controller.addError("foo"); - controller.close(); - Stream stream = controller.stream; - var output = []; - var subscription = stream.listen((x) { output.add(x); }); - bool catchErrorHasRun = false; - subscription.asFuture(output).catchError(expectAsync((error) { - Expect.equals(error, "foo"); - catchErrorHasRun = true; - })); - Timer.run(expectAsync(() { - Expect.isFalse(catchErrorHasRun); - completer.completeError(499); - })); - }, onError: expectAsync((e) { - Expect.equals(499, e); - })); - }); } diff --git a/tests/lib/async/stream_subscription_cancel_test.dart b/tests/lib/async/stream_subscription_cancel_test.dart index 48649b2717d..d6c6987545f 100644 --- a/tests/lib/async/stream_subscription_cancel_test.dart +++ b/tests/lib/async/stream_subscription_cancel_test.dart @@ -27,22 +27,17 @@ void main() { test('subscription.cancel after close', () { var completer = new Completer(); StreamController controller = new StreamController( - onCancel: () { - completer.complete(); - return completer.future; - }); + onCancel: completer.complete); controller.close(); - var completer2 = new Completer(); var sub; void onDone() { - sub.cancel().then(completer2.complete); + expect(sub.cancel(), isNull); } sub = controller.stream.listen(null, onDone: onDone); expect(completer.future, completes); - expect(completer2.future, completes); }); test('subscription.cancel after error', () { @@ -140,183 +135,4 @@ void main() { .cancel(); expect(doneCompleter.future, completion(equals(true))); }); - - test('subscription.cancel through map', () { - var completer = new Completer(); - StreamController controller = new StreamController( - onCancel: () => completer.future); - - bool done = false; - var future = controller.stream.map((x) => x).listen(null).cancel(); - - expect(future.then((_) => done = true), completion(equals(true))); - - Timer.run(() { - expect(done, isFalse); - completer.complete(); - }); - }); - - test('subscription.cancel through asyncMap', () { - var completer = new Completer(); - StreamController controller = new StreamController( - onCancel: () => completer.future); - - bool done = false; - var future = controller.stream.asyncMap((x) => x).listen(null).cancel(); - - expect(future.then((_) => done = true), completion(equals(true))); - - Timer.run(() { - expect(done, isFalse); - completer.complete(); - }); - }); - - test('subscription.cancel through asyncExpand', () { - var completer = new Completer(); - StreamController controller = new StreamController( - onCancel: () => completer.future); - - bool done = false; - var future = controller.stream.asyncExpand((x) => x).listen(null).cancel(); - - expect(future.then((_) => done = true), completion(equals(true))); - - Timer.run(() { - expect(done, isFalse); - completer.complete(); - }); - }); - - test('subscription.cancel through handleError', () { - var completer = new Completer(); - StreamController controller = new StreamController( - onCancel: () => completer.future); - - bool done = false; - var future = controller.stream.handleError((x) => x).listen(null).cancel(); - - expect(future.then((_) => done = true), completion(equals(true))); - - Timer.run(() { - expect(done, isFalse); - completer.complete(); - }); - }); - - test('subscription.cancel through skip', () { - var completer = new Completer(); - StreamController controller = new StreamController( - onCancel: () => completer.future); - - bool done = false; - var future = controller.stream.skip(1).listen(null).cancel(); - - expect(future.then((_) => done = true), completion(equals(true))); - - Timer.run(() { - expect(done, isFalse); - completer.complete(); - }); - }); - - test('subscription.cancel through take', () { - var completer = new Completer(); - StreamController controller = new StreamController( - onCancel: () => completer.future); - - bool done = false; - var future = controller.stream.take(1).listen(null).cancel(); - - expect(future.then((_) => done = true), completion(equals(true))); - - Timer.run(() { - expect(done, isFalse); - completer.complete(); - }); - }); - - test('subscription.cancel through skipWhile', () { - var completer = new Completer(); - StreamController controller = new StreamController( - onCancel: () => completer.future); - - bool done = false; - var future = controller.stream.skipWhile((x) => true).listen(null).cancel(); - - expect(future.then((_) => done = true), completion(equals(true))); - - Timer.run(() { - expect(done, isFalse); - completer.complete(); - }); - }); - - test('subscription.cancel through takeWhile', () { - var completer = new Completer(); - StreamController controller = new StreamController( - onCancel: () => completer.future); - - bool done = false; - var future = controller.stream.takeWhile((x) => true).listen(null).cancel(); - - expect(future.then((_) => done = true), completion(equals(true))); - - Timer.run(() { - expect(done, isFalse); - completer.complete(); - }); - }); - - test('subscription.cancel through timeOut', () { - var completer = new Completer(); - StreamController controller = new StreamController( - onCancel: () => completer.future); - - bool done = false; - var duration = const Duration(hours: 5); - var future = controller.stream.timeout(duration).listen(null).cancel(); - - expect(future.then((_) => done = true), completion(equals(true))); - - Timer.run(() { - expect(done, isFalse); - completer.complete(); - }); - }); - - test('subscription.cancel through transform', () { - var completer = new Completer(); - StreamController controller = new StreamController( - onCancel: () => completer.future); - - bool done = false; - var transformer = - new StreamTransformer.fromHandlers(handleData: (x, y) {}); - var future = controller.stream.transform(transformer).listen(null).cancel(); - - expect(future.then((_) => done = true), completion(equals(true))); - - Timer.run(() { - expect(done, isFalse); - completer.complete(); - }); - }); - - test('subscription.cancel through where', () { - var completer = new Completer(); - StreamController controller = new StreamController( - onCancel: () => completer.future); - - bool done = false; - var future = controller.stream.where((x) => true).listen(null).cancel(); - - expect(future.then((_) => done = true), completion(equals(true))); - - Timer.run(() { - expect(done, isFalse); - completer.complete(); - }); - }); } diff --git a/tests/lib/lib.status b/tests/lib/lib.status index 4b30359c2b1..469b3b3023d 100644 --- a/tests/lib/lib.status +++ b/tests/lib/lib.status @@ -166,7 +166,6 @@ async/stream_asyncexpand_test: RuntimeError # Timer interface not supported: Iss async/stream_asyncmap_test: RuntimeError # Timer interface not supported: Issue 7728. async/stream_transformation_broadcast_test: RuntimeError # Timer interface not supported: Issue 7728. async/stream_controller_test: Fail # Timer interface not supported: Issue 7728. -async/stream_subscription_cancel_test: Fail # Timer interface not supported: Issue 7728. async/future_constructor2_test: Fail # Timer interface not supported: Issue 7728. mirrors/mirrors_reader_test: Skip # Running in v8 suffices. Issue 16589 - RuntimeError. Issue 22130 - Crash (out of memory).